Return-Path: X-Original-To: apmail-spark-commits-archive@minotaur.apache.org Delivered-To: apmail-spark-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 0EE09178F0 for ; Thu, 30 Oct 2014 00:49:30 +0000 (UTC) Received: (qmail 89885 invoked by uid 500); 30 Oct 2014 00:49:29 -0000 Delivered-To: apmail-spark-commits-archive@spark.apache.org Received: (qmail 89856 invoked by uid 500); 30 Oct 2014 00:49:29 -0000 Mailing-List: contact commits-help@spark.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Delivered-To: mailing list commits@spark.apache.org Received: (qmail 89847 invoked by uid 99); 30 Oct 2014 00:49:29 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 30 Oct 2014 00:49:29 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id DDEB991AB3E; Thu, 30 Oct 2014 00:49:28 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: andrewor14@apache.org To: commits@spark.apache.org Message-Id: <1e199692c7184afda3ceca1821c22075@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: git commit: [SPARK-3795] Heuristics for dynamically scaling executors Date: Thu, 30 Oct 2014 00:49:28 +0000 (UTC) Repository: spark Updated Branches: refs/heads/master e7fd80413 -> 8d59b37b0 [SPARK-3795] Heuristics for dynamically scaling executors This is part of a bigger effort to provide elastic scaling of executors within a Spark application ([SPARK-3174](https://issues.apache.org/jira/browse/SPARK-3174)). This PR does not provide any functionality by itself; it is a skeleton that is missing a mechanism to be added later in [SPARK-3822](https://issues.apache.org/jira/browse/SPARK-3822). Comments and feedback are most welcome. For those of you reviewing this in detail, I highly recommend doing it through your favorite IDE instead of through the diff here. Author: Andrew Or Author: Andrew Or Closes #2746 from andrewor14/scaling-heuristics and squashes the following commits: 8a4fdaa [Andrew Or] Merge branch 'master' of github.com:apache/spark into scaling-heuristics e045df8 [Andrew Or] Add warning message (minor) dfa31ec [Andrew Or] Fix tests c0becc4 [Andrew Or] Merging with SPARK-3822 4784f93 [Andrew Or] Reword an awkward log message 181f27f [Andrew Or] Merge branch 'master' of github.com:apache/spark into scaling-heuristics c79e907 [Andrew Or] Merge branch 'master' of github.com:apache/spark into scaling-heuristics 4672b90 [Andrew Or] It's nano time. a6a30f2 [Andrew Or] Do not allow min/max executors of 0 c60ec33 [Andrew Or] Rewrite test logic with clocks b00b680 [Andrew Or] Fix style c3caa65 [Andrew Or] Merge branch 'master' of github.com:apache/spark into scaling-heuristics 7f9da14 [Andrew Or] Factor out logic to verify bounds on # executors (minor) f279019 [Andrew Or] Add time mocking tests for polling loop 685e347 [Andrew Or] Factor out clock in polling loop to facilitate testing 3cea7f7 [Andrew Or] Use PrivateMethodTester to keep original class private 3156d81 [Andrew Or] Update comments and exception messages 92f36f9 [Andrew Or] Address minor review comments abdea61 [Andrew Or] Merge branch 'master' of github.com:apache/spark into scaling-heuristics 2aefd09 [Andrew Or] Correct listener behavior 9fe6e44 [Andrew Or] Rename variables and configs + update comments and log messages 149cc32 [Andrew Or] Fix style 254c958 [Andrew Or] Merge branch 'master' of github.com:apache/spark into scaling-heuristics 5ff829b [Andrew Or] Add tests for ExecutorAllocationManager 19c6c4b [Andrew Or] Merge branch 'master' of github.com:apache/spark into scaling-heuristics 5896515 [Andrew Or] Move ExecutorAllocationManager out of scheduler package 9ca8945 [Andrew Or] Rewrite callbacks through the listener interface 5e336b9 [Andrew Or] Remove code from backend to avoid conflict with SPARK-3822 092d1fd [Andrew Or] Remove timeout logic for pending requests 1309fab [Andrew Or] Request executors by specifying the number pending 8bc0e9d [Andrew Or] Add logic to expire pending requests after timeouts b750ee1 [Andrew Or] Express timers in terms of expiration times + remove retry logic 7f8dd47 [Andrew Or] Merge branch 'master' of github.com:apache/spark into scaling-heuristics 9d516cc [Andrew Or] Bug fix: Actually trigger the add timer / add retry timer 44f1832 [Andrew Or] Rename configs to include time units eaae7ef [Andrew Or] Address various review comments 6f8be6c [Andrew Or] Beef up comments on what each of the timers mean baaa403 [Andrew Or] Simplify variable names (minor) 42beec8 [Andrew Or] Reset whether the add threshold is crossed on cancellation 9bcc0bc [Andrew Or] ExecutorScalingManager -> ExecutorAllocationManager 2784398 [Andrew Or] Merge branch 'master' of github.com:apache/spark into scaling-heuristics 5a97d9e [Andrew Or] Log retry attempts in INFO + clean up logging 2f55c9f [Andrew Or] Do not keep requesting executors even after max attempts 0acd1cb [Andrew Or] Rewrite timer logic with polling b3c7d44 [Andrew Or] Start the retry timer for adding executors at the right time 9b5f2ea [Andrew Or] Wording changes in comments and log messages c2203a5 [Andrew Or] Simplify code to access the scheduler backend e519d08 [Andrew Or] Simplify initialization code 2cc87a7 [Andrew Or] Add retry logic for removing executors d0b34a6 [Andrew Or] Add retry logic for adding executors 9cc4649 [Andrew Or] Simplifying synchronization logic 67c03c7 [Andrew Or] Correct semantics of adding executors + update comments 6c48ab0 [Andrew Or] Update synchronization comment 8901900 [Andrew Or] Simplify remove policy + change the semantics of add policy 1cc8444 [Andrew Or] Minor wording change ae5b64a [Andrew Or] Add synchronization 20ec6b9 [Andrew Or] First cut implementation of removing executors dynamically 4077ae2 [Andrew Or] Minor code re-organization 6f1fa66 [Andrew Or] First cut implementation of adding executors dynamically b2e6dcc [Andrew Or] Add skeleton interface for requesting / killing executors Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/8d59b37b Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/8d59b37b Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/8d59b37b Branch: refs/heads/master Commit: 8d59b37b02eb36f37bcefafb952519d7dca744ad Parents: e7fd804 Author: Andrew Or Authored: Wed Oct 29 17:48:59 2014 -0700 Committer: Andrew Or Committed: Wed Oct 29 17:48:59 2014 -0700 ---------------------------------------------------------------------- .../spark/ExecutorAllocationManager.scala | 462 +++++++++++++ .../scala/org/apache/spark/SparkContext.scala | 35 +- .../spark/ExecutorAllocationManagerSuite.scala | 662 +++++++++++++++++++ .../spark/deploy/yarn/ApplicationMaster.scala | 2 +- 4 files changed, 1150 insertions(+), 11 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/8d59b37b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala new file mode 100644 index 0000000..b2cf022 --- /dev/null +++ b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala @@ -0,0 +1,462 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark + +import scala.collection.mutable + +import org.apache.spark.scheduler._ + +/** + * An agent that dynamically allocates and removes executors based on the workload. + * + * The add policy depends on whether there are backlogged tasks waiting to be scheduled. If + * the scheduler queue is not drained in N seconds, then new executors are added. If the queue + * persists for another M seconds, then more executors are added and so on. The number added + * in each round increases exponentially from the previous round until an upper bound on the + * number of executors has been reached. + * + * The rationale for the exponential increase is twofold: (1) Executors should be added slowly + * in the beginning in case the number of extra executors needed turns out to be small. Otherwise, + * we may add more executors than we need just to remove them later. (2) Executors should be added + * quickly over time in case the maximum number of executors is very high. Otherwise, it will take + * a long time to ramp up under heavy workloads. + * + * The remove policy is simpler: If an executor has been idle for K seconds, meaning it has not + * been scheduled to run any tasks, then it is removed. + * + * There is no retry logic in either case because we make the assumption that the cluster manager + * will eventually fulfill all requests it receives asynchronously. + * + * The relevant Spark properties include the following: + * + * spark.dynamicAllocation.enabled - Whether this feature is enabled + * spark.dynamicAllocation.minExecutors - Lower bound on the number of executors + * spark.dynamicAllocation.maxExecutors - Upper bound on the number of executors + * + * spark.dynamicAllocation.schedulerBacklogTimeout (M) - + * If there are backlogged tasks for this duration, add new executors + * + * spark.dynamicAllocation.sustainedSchedulerBacklogTimeout (N) - + * If the backlog is sustained for this duration, add more executors + * This is used only after the initial backlog timeout is exceeded + * + * spark.dynamicAllocation.executorIdleTimeout (K) - + * If an executor has been idle for this duration, remove it + */ +private[spark] class ExecutorAllocationManager(sc: SparkContext) extends Logging { + import ExecutorAllocationManager._ + + private val conf = sc.conf + + // Lower and upper bounds on the number of executors. These are required. + private val minNumExecutors = conf.getInt("spark.dynamicAllocation.minExecutors", -1) + private val maxNumExecutors = conf.getInt("spark.dynamicAllocation.maxExecutors", -1) + verifyBounds() + + // How long there must be backlogged tasks for before an addition is triggered + private val schedulerBacklogTimeout = conf.getLong( + "spark.dynamicAllocation.schedulerBacklogTimeout", 60) + + // Same as above, but used only after `schedulerBacklogTimeout` is exceeded + private val sustainedSchedulerBacklogTimeout = conf.getLong( + "spark.dynamicAllocation.sustainedSchedulerBacklogTimeout", schedulerBacklogTimeout) + + // How long an executor must be idle for before it is removed + private val removeThresholdSeconds = conf.getLong( + "spark.dynamicAllocation.executorIdleTimeout", 600) + + // Number of executors to add in the next round + private var numExecutorsToAdd = 1 + + // Number of executors that have been requested but have not registered yet + private var numExecutorsPending = 0 + + // Executors that have been requested to be removed but have not been killed yet + private val executorsPendingToRemove = new mutable.HashSet[String] + + // All known executors + private val executorIds = new mutable.HashSet[String] + + // A timestamp of when an addition should be triggered, or NOT_SET if it is not set + // This is set when pending tasks are added but not scheduled yet + private var addTime: Long = NOT_SET + + // A timestamp for each executor of when the executor should be removed, indexed by the ID + // This is set when an executor is no longer running a task, or when it first registers + private val removeTimes = new mutable.HashMap[String, Long] + + // Polling loop interval (ms) + private val intervalMillis: Long = 100 + + // Whether we are testing this class. This should only be used internally. + private val testing = conf.getBoolean("spark.dynamicAllocation.testing", false) + + // Clock used to schedule when executors should be added and removed + private var clock: Clock = new RealClock + + /** + * Verify that the lower and upper bounds on the number of executors are valid. + * If not, throw an appropriate exception. + */ + private def verifyBounds(): Unit = { + if (minNumExecutors < 0 || maxNumExecutors < 0) { + throw new SparkException("spark.dynamicAllocation.{min/max}Executors must be set!") + } + if (minNumExecutors == 0 || maxNumExecutors == 0) { + throw new SparkException("spark.dynamicAllocation.{min/max}Executors cannot be 0!") + } + if (minNumExecutors > maxNumExecutors) { + throw new SparkException(s"spark.dynamicAllocation.minExecutors ($minNumExecutors) must " + + s"be less than or equal to spark.dynamicAllocation.maxExecutors ($maxNumExecutors)!") + } + } + + /** + * Use a different clock for this allocation manager. This is mainly used for testing. + */ + def setClock(newClock: Clock): Unit = { + clock = newClock + } + + /** + * Register for scheduler callbacks to decide when to add and remove executors. + */ + def start(): Unit = { + val listener = new ExecutorAllocationListener(this) + sc.addSparkListener(listener) + startPolling() + } + + /** + * Start the main polling thread that keeps track of when to add and remove executors. + */ + private def startPolling(): Unit = { + val t = new Thread { + override def run(): Unit = { + while (true) { + try { + schedule() + } catch { + case e: Exception => logError("Exception in dynamic executor allocation thread!", e) + } + Thread.sleep(intervalMillis) + } + } + } + t.setName("spark-dynamic-executor-allocation") + t.setDaemon(true) + t.start() + } + + /** + * If the add time has expired, request new executors and refresh the add time. + * If the remove time for an existing executor has expired, kill the executor. + * This is factored out into its own method for testing. + */ + private def schedule(): Unit = synchronized { + val now = clock.getTimeMillis + if (addTime != NOT_SET && now >= addTime) { + addExecutors() + logDebug(s"Starting timer to add more executors (to " + + s"expire in $sustainedSchedulerBacklogTimeout seconds)") + addTime += sustainedSchedulerBacklogTimeout * 1000 + } + + removeTimes.foreach { case (executorId, expireTime) => + if (now >= expireTime) { + removeExecutor(executorId) + removeTimes.remove(executorId) + } + } + } + + /** + * Request a number of executors from the cluster manager. + * If the cap on the number of executors is reached, give up and reset the + * number of executors to add next round instead of continuing to double it. + * Return the number actually requested. + */ + private def addExecutors(): Int = synchronized { + // Do not request more executors if we have already reached the upper bound + val numExistingExecutors = executorIds.size + numExecutorsPending + if (numExistingExecutors >= maxNumExecutors) { + logDebug(s"Not adding executors because there are already ${executorIds.size} " + + s"registered and $numExecutorsPending pending executor(s) (limit $maxNumExecutors)") + numExecutorsToAdd = 1 + return 0 + } + + // Request executors with respect to the upper bound + val actualNumExecutorsToAdd = + if (numExistingExecutors + numExecutorsToAdd <= maxNumExecutors) { + numExecutorsToAdd + } else { + maxNumExecutors - numExistingExecutors + } + val newTotalExecutors = numExistingExecutors + actualNumExecutorsToAdd + val addRequestAcknowledged = testing || sc.requestExecutors(actualNumExecutorsToAdd) + if (addRequestAcknowledged) { + logInfo(s"Requesting $actualNumExecutorsToAdd new executor(s) because " + + s"tasks are backlogged (new desired total will be $newTotalExecutors)") + numExecutorsToAdd = + if (actualNumExecutorsToAdd == numExecutorsToAdd) numExecutorsToAdd * 2 else 1 + numExecutorsPending += actualNumExecutorsToAdd + actualNumExecutorsToAdd + } else { + logWarning(s"Unable to reach the cluster manager " + + s"to request $actualNumExecutorsToAdd executors!") + 0 + } + } + + /** + * Request the cluster manager to remove the given executor. + * Return whether the request is received. + */ + private def removeExecutor(executorId: String): Boolean = synchronized { + // Do not kill the executor if we are not aware of it (should never happen) + if (!executorIds.contains(executorId)) { + logWarning(s"Attempted to remove unknown executor $executorId!") + return false + } + + // Do not kill the executor again if it is already pending to be killed (should never happen) + if (executorsPendingToRemove.contains(executorId)) { + logWarning(s"Attempted to remove executor $executorId " + + s"when it is already pending to be removed!") + return false + } + + // Do not kill the executor if we have already reached the lower bound + val numExistingExecutors = executorIds.size - executorsPendingToRemove.size + if (numExistingExecutors - 1 < minNumExecutors) { + logInfo(s"Not removing idle executor $executorId because there are only " + + s"$numExistingExecutors executor(s) left (limit $minNumExecutors)") + return false + } + + // Send a request to the backend to kill this executor + val removeRequestAcknowledged = testing || sc.killExecutor(executorId) + if (removeRequestAcknowledged) { + logInfo(s"Removing executor $executorId because it has been idle for " + + s"$removeThresholdSeconds seconds (new desired total will be ${numExistingExecutors - 1})") + executorsPendingToRemove.add(executorId) + true + } else { + logWarning(s"Unable to reach the cluster manager to kill executor $executorId!") + false + } + } + + /** + * Callback invoked when the specified executor has been added. + */ + private def onExecutorAdded(executorId: String): Unit = synchronized { + if (!executorIds.contains(executorId)) { + executorIds.add(executorId) + executorIds.foreach(onExecutorIdle) + logInfo(s"New executor $executorId has registered (new total is ${executorIds.size})") + if (numExecutorsPending > 0) { + numExecutorsPending -= 1 + logDebug(s"Decremented number of pending executors ($numExecutorsPending left)") + } + } else { + logWarning(s"Duplicate executor $executorId has registered") + } + } + + /** + * Callback invoked when the specified executor has been removed. + */ + private def onExecutorRemoved(executorId: String): Unit = synchronized { + if (executorIds.contains(executorId)) { + executorIds.remove(executorId) + removeTimes.remove(executorId) + logInfo(s"Existing executor $executorId has been removed (new total is ${executorIds.size})") + if (executorsPendingToRemove.contains(executorId)) { + executorsPendingToRemove.remove(executorId) + logDebug(s"Executor $executorId is no longer pending to " + + s"be removed (${executorsPendingToRemove.size} left)") + } + } else { + logWarning(s"Unknown executor $executorId has been removed!") + } + } + + /** + * Callback invoked when the scheduler receives new pending tasks. + * This sets a time in the future that decides when executors should be added + * if it is not already set. + */ + private def onSchedulerBacklogged(): Unit = synchronized { + if (addTime == NOT_SET) { + logDebug(s"Starting timer to add executors because pending tasks " + + s"are building up (to expire in $schedulerBacklogTimeout seconds)") + addTime = clock.getTimeMillis + schedulerBacklogTimeout * 1000 + } + } + + /** + * Callback invoked when the scheduler queue is drained. + * This resets all variables used for adding executors. + */ + private def onSchedulerQueueEmpty(): Unit = synchronized { + logDebug(s"Clearing timer to add executors because there are no more pending tasks") + addTime = NOT_SET + numExecutorsToAdd = 1 + } + + /** + * Callback invoked when the specified executor is no longer running any tasks. + * This sets a time in the future that decides when this executor should be removed if + * the executor is not already marked as idle. + */ + private def onExecutorIdle(executorId: String): Unit = synchronized { + if (!removeTimes.contains(executorId) && !executorsPendingToRemove.contains(executorId)) { + logDebug(s"Starting idle timer for $executorId because there are no more tasks " + + s"scheduled to run on the executor (to expire in $removeThresholdSeconds seconds)") + removeTimes(executorId) = clock.getTimeMillis + removeThresholdSeconds * 1000 + } + } + + /** + * Callback invoked when the specified executor is now running a task. + * This resets all variables used for removing this executor. + */ + private def onExecutorBusy(executorId: String): Unit = synchronized { + logDebug(s"Clearing idle timer for $executorId because it is now running a task") + removeTimes.remove(executorId) + } + + /** + * A listener that notifies the given allocation manager of when to add and remove executors. + * + * This class is intentionally conservative in its assumptions about the relative ordering + * and consistency of events returned by the listener. For simplicity, it does not account + * for speculated tasks. + */ + private class ExecutorAllocationListener(allocationManager: ExecutorAllocationManager) + extends SparkListener { + + private val stageIdToNumTasks = new mutable.HashMap[Int, Int] + private val stageIdToTaskIndices = new mutable.HashMap[Int, mutable.HashSet[Int]] + private val executorIdToTaskIds = new mutable.HashMap[String, mutable.HashSet[Long]] + + override def onStageSubmitted(stageSubmitted: SparkListenerStageSubmitted): Unit = { + synchronized { + val stageId = stageSubmitted.stageInfo.stageId + val numTasks = stageSubmitted.stageInfo.numTasks + stageIdToNumTasks(stageId) = numTasks + allocationManager.onSchedulerBacklogged() + } + } + + override def onStageCompleted(stageCompleted: SparkListenerStageCompleted): Unit = { + synchronized { + val stageId = stageCompleted.stageInfo.stageId + stageIdToNumTasks -= stageId + stageIdToTaskIndices -= stageId + + // If this is the last stage with pending tasks, mark the scheduler queue as empty + // This is needed in case the stage is aborted for any reason + if (stageIdToNumTasks.isEmpty) { + allocationManager.onSchedulerQueueEmpty() + } + } + } + + override def onTaskStart(taskStart: SparkListenerTaskStart): Unit = synchronized { + val stageId = taskStart.stageId + val taskId = taskStart.taskInfo.taskId + val taskIndex = taskStart.taskInfo.index + val executorId = taskStart.taskInfo.executorId + + // If this is the last pending task, mark the scheduler queue as empty + stageIdToTaskIndices.getOrElseUpdate(stageId, new mutable.HashSet[Int]) += taskIndex + val numTasksScheduled = stageIdToTaskIndices(stageId).size + val numTasksTotal = stageIdToNumTasks.getOrElse(stageId, -1) + if (numTasksScheduled == numTasksTotal) { + // No more pending tasks for this stage + stageIdToNumTasks -= stageId + if (stageIdToNumTasks.isEmpty) { + allocationManager.onSchedulerQueueEmpty() + } + } + + // Mark the executor on which this task is scheduled as busy + executorIdToTaskIds.getOrElseUpdate(executorId, new mutable.HashSet[Long]) += taskId + allocationManager.onExecutorBusy(executorId) + } + + override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = synchronized { + val executorId = taskEnd.taskInfo.executorId + val taskId = taskEnd.taskInfo.taskId + + // If the executor is no longer running scheduled any tasks, mark it as idle + if (executorIdToTaskIds.contains(executorId)) { + executorIdToTaskIds(executorId) -= taskId + if (executorIdToTaskIds(executorId).isEmpty) { + executorIdToTaskIds -= executorId + allocationManager.onExecutorIdle(executorId) + } + } + } + + override def onBlockManagerAdded(blockManagerAdded: SparkListenerBlockManagerAdded): Unit = { + val executorId = blockManagerAdded.blockManagerId.executorId + if (executorId != "") { + allocationManager.onExecutorAdded(executorId) + } + } + + override def onBlockManagerRemoved( + blockManagerRemoved: SparkListenerBlockManagerRemoved): Unit = { + allocationManager.onExecutorRemoved(blockManagerRemoved.blockManagerId.executorId) + } + } + +} + +private object ExecutorAllocationManager { + val NOT_SET = Long.MaxValue +} + +/** + * An abstract clock for measuring elapsed time. + */ +private trait Clock { + def getTimeMillis: Long +} + +/** + * A clock backed by a monotonically increasing time source. + * The time returned by this clock does not correspond to any notion of wall-clock time. + */ +private class RealClock extends Clock { + override def getTimeMillis: Long = System.nanoTime / (1000 * 1000) +} + +/** + * A clock that allows the caller to customize the time. + * This is used mainly for testing. + */ +private class TestClock(startTimeMillis: Long) extends Clock { + private var time: Long = startTimeMillis + override def getTimeMillis: Long = time + def tick(ms: Long): Unit = { time += ms } +} http://git-wip-us.apache.org/repos/asf/spark/blob/8d59b37b/core/src/main/scala/org/apache/spark/SparkContext.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 40ea369..73668e8 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -330,6 +330,15 @@ class SparkContext(config: SparkConf) extends SparkStatusAPI with Logging { } else None } + // Optionally scale number of executors dynamically based on workload. Exposed for testing. + private[spark] val executorAllocationManager: Option[ExecutorAllocationManager] = + if (conf.getBoolean("spark.dynamicAllocation.enabled", false)) { + Some(new ExecutorAllocationManager(this)) + } else { + None + } + executorAllocationManager.foreach(_.start()) + // At this point, all relevant SparkListeners have been registered, so begin releasing events listenerBus.start() @@ -860,36 +869,42 @@ class SparkContext(config: SparkConf) extends SparkStatusAPI with Logging { /** * :: DeveloperApi :: * Request an additional number of executors from the cluster manager. - * This is currently only supported in Yarn mode. + * This is currently only supported in Yarn mode. Return whether the request is received. */ @DeveloperApi - def requestExecutors(numAdditionalExecutors: Int): Unit = { + def requestExecutors(numAdditionalExecutors: Int): Boolean = { schedulerBackend match { - case b: CoarseGrainedSchedulerBackend => b.requestExecutors(numAdditionalExecutors) - case _ => logWarning("Requesting executors is only supported in coarse-grained mode") + case b: CoarseGrainedSchedulerBackend => + b.requestExecutors(numAdditionalExecutors) + case _ => + logWarning("Requesting executors is only supported in coarse-grained mode") + false } } /** * :: DeveloperApi :: * Request that the cluster manager kill the specified executors. - * This is currently only supported in Yarn mode. + * This is currently only supported in Yarn mode. Return whether the request is received. */ @DeveloperApi - def killExecutors(executorIds: Seq[String]): Unit = { + def killExecutors(executorIds: Seq[String]): Boolean = { schedulerBackend match { - case b: CoarseGrainedSchedulerBackend => b.killExecutors(executorIds) - case _ => logWarning("Killing executors is only supported in coarse-grained mode") + case b: CoarseGrainedSchedulerBackend => + b.killExecutors(executorIds) + case _ => + logWarning("Killing executors is only supported in coarse-grained mode") + false } } /** * :: DeveloperApi :: * Request that cluster manager the kill the specified executor. - * This is currently only supported in Yarn mode. + * This is currently only supported in Yarn mode. Return whether the request is received. */ @DeveloperApi - def killExecutor(executorId: String): Unit = killExecutors(Seq(executorId)) + def killExecutor(executorId: String): Boolean = killExecutors(Seq(executorId)) /** The version of Spark on which this application is running. */ def version = SPARK_VERSION http://git-wip-us.apache.org/repos/asf/spark/blob/8d59b37b/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala b/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala new file mode 100644 index 0000000..f0aa914 --- /dev/null +++ b/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala @@ -0,0 +1,662 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark + +import org.scalatest.{FunSuite, PrivateMethodTester} +import org.apache.spark.executor.TaskMetrics +import org.apache.spark.scheduler._ +import org.apache.spark.storage.BlockManagerId + +/** + * Test add and remove behavior of ExecutorAllocationManager. + */ +class ExecutorAllocationManagerSuite extends FunSuite { + import ExecutorAllocationManager._ + import ExecutorAllocationManagerSuite._ + + test("verify min/max executors") { + // No min or max + val conf = new SparkConf() + .setMaster("local") + .setAppName("test-executor-allocation-manager") + .set("spark.dynamicAllocation.enabled", "true") + intercept[SparkException] { new SparkContext(conf) } + + // Only min + val conf1 = conf.clone().set("spark.dynamicAllocation.minExecutors", "1") + intercept[SparkException] { new SparkContext(conf1) } + + // Only max + val conf2 = conf.clone().set("spark.dynamicAllocation.maxExecutors", "2") + intercept[SparkException] { new SparkContext(conf2) } + + // Both min and max, but min > max + intercept[SparkException] { createSparkContext(2, 1) } + + // Both min and max, and min == max + val sc1 = createSparkContext(1, 1) + assert(sc1.executorAllocationManager.isDefined) + sc1.stop() + + // Both min and max, and min < max + val sc2 = createSparkContext(1, 2) + assert(sc2.executorAllocationManager.isDefined) + sc2.stop() + } + + test("starting state") { + val sc = createSparkContext() + val manager = sc.executorAllocationManager.get + assert(numExecutorsPending(manager) === 0) + assert(executorsPendingToRemove(manager).isEmpty) + assert(executorIds(manager).isEmpty) + assert(addTime(manager) === ExecutorAllocationManager.NOT_SET) + assert(removeTimes(manager).isEmpty) + sc.stop() + } + + test("add executors") { + val sc = createSparkContext(1, 10) + val manager = sc.executorAllocationManager.get + + // Keep adding until the limit is reached + assert(numExecutorsPending(manager) === 0) + assert(numExecutorsToAdd(manager) === 1) + assert(addExecutors(manager) === 1) + assert(numExecutorsPending(manager) === 1) + assert(numExecutorsToAdd(manager) === 2) + assert(addExecutors(manager) === 2) + assert(numExecutorsPending(manager) === 3) + assert(numExecutorsToAdd(manager) === 4) + assert(addExecutors(manager) === 4) + assert(numExecutorsPending(manager) === 7) + assert(numExecutorsToAdd(manager) === 8) + assert(addExecutors(manager) === 3) // reached the limit of 10 + assert(numExecutorsPending(manager) === 10) + assert(numExecutorsToAdd(manager) === 1) + assert(addExecutors(manager) === 0) + assert(numExecutorsPending(manager) === 10) + assert(numExecutorsToAdd(manager) === 1) + + // Register previously requested executors + onExecutorAdded(manager, "first") + assert(numExecutorsPending(manager) === 9) + onExecutorAdded(manager, "second") + onExecutorAdded(manager, "third") + onExecutorAdded(manager, "fourth") + assert(numExecutorsPending(manager) === 6) + onExecutorAdded(manager, "first") // duplicates should not count + onExecutorAdded(manager, "second") + assert(numExecutorsPending(manager) === 6) + + // Try adding again + // This should still fail because the number pending + running is still at the limit + assert(addExecutors(manager) === 0) + assert(numExecutorsPending(manager) === 6) + assert(numExecutorsToAdd(manager) === 1) + assert(addExecutors(manager) === 0) + assert(numExecutorsPending(manager) === 6) + assert(numExecutorsToAdd(manager) === 1) + sc.stop() + } + + test("remove executors") { + val sc = createSparkContext(5, 10) + val manager = sc.executorAllocationManager.get + (1 to 10).map(_.toString).foreach { id => onExecutorAdded(manager, id) } + + // Keep removing until the limit is reached + assert(executorsPendingToRemove(manager).isEmpty) + assert(removeExecutor(manager, "1")) + assert(executorsPendingToRemove(manager).size === 1) + assert(executorsPendingToRemove(manager).contains("1")) + assert(removeExecutor(manager, "2")) + assert(removeExecutor(manager, "3")) + assert(executorsPendingToRemove(manager).size === 3) + assert(executorsPendingToRemove(manager).contains("2")) + assert(executorsPendingToRemove(manager).contains("3")) + assert(!removeExecutor(manager, "100")) // remove non-existent executors + assert(!removeExecutor(manager, "101")) + assert(executorsPendingToRemove(manager).size === 3) + assert(removeExecutor(manager, "4")) + assert(removeExecutor(manager, "5")) + assert(!removeExecutor(manager, "6")) // reached the limit of 5 + assert(executorsPendingToRemove(manager).size === 5) + assert(executorsPendingToRemove(manager).contains("4")) + assert(executorsPendingToRemove(manager).contains("5")) + assert(!executorsPendingToRemove(manager).contains("6")) + + // Kill executors previously requested to remove + onExecutorRemoved(manager, "1") + assert(executorsPendingToRemove(manager).size === 4) + assert(!executorsPendingToRemove(manager).contains("1")) + onExecutorRemoved(manager, "2") + onExecutorRemoved(manager, "3") + assert(executorsPendingToRemove(manager).size === 2) + assert(!executorsPendingToRemove(manager).contains("2")) + assert(!executorsPendingToRemove(manager).contains("3")) + onExecutorRemoved(manager, "2") // duplicates should not count + onExecutorRemoved(manager, "3") + assert(executorsPendingToRemove(manager).size === 2) + onExecutorRemoved(manager, "4") + onExecutorRemoved(manager, "5") + assert(executorsPendingToRemove(manager).isEmpty) + + // Try removing again + // This should still fail because the number pending + running is still at the limit + assert(!removeExecutor(manager, "7")) + assert(executorsPendingToRemove(manager).isEmpty) + assert(!removeExecutor(manager, "8")) + assert(executorsPendingToRemove(manager).isEmpty) + sc.stop() + } + + test ("interleaving add and remove") { + val sc = createSparkContext(5, 10) + val manager = sc.executorAllocationManager.get + + // Add a few executors + assert(addExecutors(manager) === 1) + assert(addExecutors(manager) === 2) + assert(addExecutors(manager) === 4) + onExecutorAdded(manager, "1") + onExecutorAdded(manager, "2") + onExecutorAdded(manager, "3") + onExecutorAdded(manager, "4") + onExecutorAdded(manager, "5") + onExecutorAdded(manager, "6") + onExecutorAdded(manager, "7") + assert(executorIds(manager).size === 7) + + // Remove until limit + assert(removeExecutor(manager, "1")) + assert(removeExecutor(manager, "2")) + assert(!removeExecutor(manager, "3")) // lower limit reached + assert(!removeExecutor(manager, "4")) + onExecutorRemoved(manager, "1") + onExecutorRemoved(manager, "2") + assert(executorIds(manager).size === 5) + + // Add until limit + assert(addExecutors(manager) === 5) // upper limit reached + assert(addExecutors(manager) === 0) + assert(!removeExecutor(manager, "3")) // still at lower limit + assert(!removeExecutor(manager, "4")) + onExecutorAdded(manager, "8") + onExecutorAdded(manager, "9") + onExecutorAdded(manager, "10") + onExecutorAdded(manager, "11") + onExecutorAdded(manager, "12") + assert(executorIds(manager).size === 10) + + // Remove succeeds again, now that we are no longer at the lower limit + assert(removeExecutor(manager, "3")) + assert(removeExecutor(manager, "4")) + assert(removeExecutor(manager, "5")) + assert(removeExecutor(manager, "6")) + assert(executorIds(manager).size === 10) + assert(addExecutors(manager) === 0) // still at upper limit + onExecutorRemoved(manager, "3") + onExecutorRemoved(manager, "4") + assert(executorIds(manager).size === 8) + + // Add succeeds again, now that we are no longer at the upper limit + // Number of executors added restarts at 1 + assert(addExecutors(manager) === 1) + assert(addExecutors(manager) === 1) // upper limit reached again + assert(addExecutors(manager) === 0) + assert(executorIds(manager).size === 8) + onExecutorRemoved(manager, "5") + onExecutorRemoved(manager, "6") + onExecutorAdded(manager, "13") + onExecutorAdded(manager, "14") + assert(executorIds(manager).size === 8) + assert(addExecutors(manager) === 1) + assert(addExecutors(manager) === 1) // upper limit reached again + assert(addExecutors(manager) === 0) + onExecutorAdded(manager, "15") + onExecutorAdded(manager, "16") + assert(executorIds(manager).size === 10) + sc.stop() + } + + test("starting/canceling add timer") { + val sc = createSparkContext(2, 10) + val clock = new TestClock(8888L) + val manager = sc.executorAllocationManager.get + manager.setClock(clock) + + // Starting add timer is idempotent + assert(addTime(manager) === NOT_SET) + onSchedulerBacklogged(manager) + val firstAddTime = addTime(manager) + assert(firstAddTime === clock.getTimeMillis + schedulerBacklogTimeout * 1000) + clock.tick(100L) + onSchedulerBacklogged(manager) + assert(addTime(manager) === firstAddTime) // timer is already started + clock.tick(200L) + onSchedulerBacklogged(manager) + assert(addTime(manager) === firstAddTime) + onSchedulerQueueEmpty(manager) + + // Restart add timer + clock.tick(1000L) + assert(addTime(manager) === NOT_SET) + onSchedulerBacklogged(manager) + val secondAddTime = addTime(manager) + assert(secondAddTime === clock.getTimeMillis + schedulerBacklogTimeout * 1000) + clock.tick(100L) + onSchedulerBacklogged(manager) + assert(addTime(manager) === secondAddTime) // timer is already started + assert(addTime(manager) !== firstAddTime) + assert(firstAddTime !== secondAddTime) + } + + test("starting/canceling remove timers") { + val sc = createSparkContext(2, 10) + val clock = new TestClock(14444L) + val manager = sc.executorAllocationManager.get + manager.setClock(clock) + + // Starting remove timer is idempotent for each executor + assert(removeTimes(manager).isEmpty) + onExecutorIdle(manager, "1") + assert(removeTimes(manager).size === 1) + assert(removeTimes(manager).contains("1")) + val firstRemoveTime = removeTimes(manager)("1") + assert(firstRemoveTime === clock.getTimeMillis + executorIdleTimeout * 1000) + clock.tick(100L) + onExecutorIdle(manager, "1") + assert(removeTimes(manager)("1") === firstRemoveTime) // timer is already started + clock.tick(200L) + onExecutorIdle(manager, "1") + assert(removeTimes(manager)("1") === firstRemoveTime) + clock.tick(300L) + onExecutorIdle(manager, "2") + assert(removeTimes(manager)("2") !== firstRemoveTime) // different executor + assert(removeTimes(manager)("2") === clock.getTimeMillis + executorIdleTimeout * 1000) + clock.tick(400L) + onExecutorIdle(manager, "3") + assert(removeTimes(manager)("3") !== firstRemoveTime) + assert(removeTimes(manager)("3") === clock.getTimeMillis + executorIdleTimeout * 1000) + assert(removeTimes(manager).size === 3) + assert(removeTimes(manager).contains("2")) + assert(removeTimes(manager).contains("3")) + + // Restart remove timer + clock.tick(1000L) + onExecutorBusy(manager, "1") + assert(removeTimes(manager).size === 2) + onExecutorIdle(manager, "1") + assert(removeTimes(manager).size === 3) + assert(removeTimes(manager).contains("1")) + val secondRemoveTime = removeTimes(manager)("1") + assert(secondRemoveTime === clock.getTimeMillis + executorIdleTimeout * 1000) + assert(removeTimes(manager)("1") === secondRemoveTime) // timer is already started + assert(removeTimes(manager)("1") !== firstRemoveTime) + assert(firstRemoveTime !== secondRemoveTime) + } + + test("mock polling loop with no events") { + val sc = createSparkContext(1, 20) + val manager = sc.executorAllocationManager.get + val clock = new TestClock(2020L) + manager.setClock(clock) + + // No events - we should not be adding or removing + assert(numExecutorsPending(manager) === 0) + assert(executorsPendingToRemove(manager).isEmpty) + schedule(manager) + assert(numExecutorsPending(manager) === 0) + assert(executorsPendingToRemove(manager).isEmpty) + clock.tick(100L) + schedule(manager) + assert(numExecutorsPending(manager) === 0) + assert(executorsPendingToRemove(manager).isEmpty) + clock.tick(1000L) + schedule(manager) + assert(numExecutorsPending(manager) === 0) + assert(executorsPendingToRemove(manager).isEmpty) + clock.tick(10000L) + schedule(manager) + assert(numExecutorsPending(manager) === 0) + assert(executorsPendingToRemove(manager).isEmpty) + } + + test("mock polling loop add behavior") { + val sc = createSparkContext(1, 20) + val clock = new TestClock(2020L) + val manager = sc.executorAllocationManager.get + manager.setClock(clock) + + // Scheduler queue backlogged + onSchedulerBacklogged(manager) + clock.tick(schedulerBacklogTimeout * 1000 / 2) + schedule(manager) + assert(numExecutorsPending(manager) === 0) // timer not exceeded yet + clock.tick(schedulerBacklogTimeout * 1000) + schedule(manager) + assert(numExecutorsPending(manager) === 1) // first timer exceeded + clock.tick(sustainedSchedulerBacklogTimeout * 1000 / 2) + schedule(manager) + assert(numExecutorsPending(manager) === 1) // second timer not exceeded yet + clock.tick(sustainedSchedulerBacklogTimeout * 1000) + schedule(manager) + assert(numExecutorsPending(manager) === 1 + 2) // second timer exceeded + clock.tick(sustainedSchedulerBacklogTimeout * 1000) + schedule(manager) + assert(numExecutorsPending(manager) === 1 + 2 + 4) // third timer exceeded + + // Scheduler queue drained + onSchedulerQueueEmpty(manager) + clock.tick(sustainedSchedulerBacklogTimeout * 1000) + schedule(manager) + assert(numExecutorsPending(manager) === 7) // timer is canceled + clock.tick(sustainedSchedulerBacklogTimeout * 1000) + schedule(manager) + assert(numExecutorsPending(manager) === 7) + + // Scheduler queue backlogged again + onSchedulerBacklogged(manager) + clock.tick(schedulerBacklogTimeout * 1000) + schedule(manager) + assert(numExecutorsPending(manager) === 7 + 1) // timer restarted + clock.tick(sustainedSchedulerBacklogTimeout * 1000) + schedule(manager) + assert(numExecutorsPending(manager) === 7 + 1 + 2) + clock.tick(sustainedSchedulerBacklogTimeout * 1000) + schedule(manager) + assert(numExecutorsPending(manager) === 7 + 1 + 2 + 4) + clock.tick(sustainedSchedulerBacklogTimeout * 1000) + schedule(manager) + assert(numExecutorsPending(manager) === 20) // limit reached + } + + test("mock polling loop remove behavior") { + val sc = createSparkContext(1, 20) + val clock = new TestClock(2020L) + val manager = sc.executorAllocationManager.get + manager.setClock(clock) + + // Remove idle executors on timeout + onExecutorAdded(manager, "executor-1") + onExecutorAdded(manager, "executor-2") + onExecutorAdded(manager, "executor-3") + assert(removeTimes(manager).size === 3) + assert(executorsPendingToRemove(manager).isEmpty) + clock.tick(executorIdleTimeout * 1000 / 2) + schedule(manager) + assert(removeTimes(manager).size === 3) // idle threshold not reached yet + assert(executorsPendingToRemove(manager).isEmpty) + clock.tick(executorIdleTimeout * 1000) + schedule(manager) + assert(removeTimes(manager).isEmpty) // idle threshold exceeded + assert(executorsPendingToRemove(manager).size === 2) // limit reached (1 executor remaining) + + // Mark a subset as busy - only idle executors should be removed + onExecutorAdded(manager, "executor-4") + onExecutorAdded(manager, "executor-5") + onExecutorAdded(manager, "executor-6") + onExecutorAdded(manager, "executor-7") + assert(removeTimes(manager).size === 5) // 5 active executors + assert(executorsPendingToRemove(manager).size === 2) // 2 pending to be removed + onExecutorBusy(manager, "executor-4") + onExecutorBusy(manager, "executor-5") + onExecutorBusy(manager, "executor-6") // 3 busy and 2 idle (of the 5 active ones) + schedule(manager) + assert(removeTimes(manager).size === 2) // remove only idle executors + assert(!removeTimes(manager).contains("executor-4")) + assert(!removeTimes(manager).contains("executor-5")) + assert(!removeTimes(manager).contains("executor-6")) + assert(executorsPendingToRemove(manager).size === 2) + clock.tick(executorIdleTimeout * 1000) + schedule(manager) + assert(removeTimes(manager).isEmpty) // idle executors are removed + assert(executorsPendingToRemove(manager).size === 4) + assert(!executorsPendingToRemove(manager).contains("executor-4")) + assert(!executorsPendingToRemove(manager).contains("executor-5")) + assert(!executorsPendingToRemove(manager).contains("executor-6")) + + // Busy executors are now idle and should be removed + onExecutorIdle(manager, "executor-4") + onExecutorIdle(manager, "executor-5") + onExecutorIdle(manager, "executor-6") + schedule(manager) + assert(removeTimes(manager).size === 3) // 0 busy and 3 idle + assert(removeTimes(manager).contains("executor-4")) + assert(removeTimes(manager).contains("executor-5")) + assert(removeTimes(manager).contains("executor-6")) + assert(executorsPendingToRemove(manager).size === 4) + clock.tick(executorIdleTimeout * 1000) + schedule(manager) + assert(removeTimes(manager).isEmpty) + assert(executorsPendingToRemove(manager).size === 6) // limit reached (1 executor remaining) + } + + test("listeners trigger add executors correctly") { + val sc = createSparkContext(2, 10) + val manager = sc.executorAllocationManager.get + assert(addTime(manager) === NOT_SET) + + // Starting a stage should start the add timer + val numTasks = 10 + sc.listenerBus.postToAll(SparkListenerStageSubmitted(createStageInfo(0, numTasks))) + assert(addTime(manager) !== NOT_SET) + + // Starting a subset of the tasks should not cancel the add timer + val taskInfos = (0 to numTasks - 1).map { i => createTaskInfo(i, i, "executor-1") } + taskInfos.tail.foreach { info => sc.listenerBus.postToAll(SparkListenerTaskStart(0, 0, info)) } + assert(addTime(manager) !== NOT_SET) + + // Starting all remaining tasks should cancel the add timer + sc.listenerBus.postToAll(SparkListenerTaskStart(0, 0, taskInfos.head)) + assert(addTime(manager) === NOT_SET) + + // Start two different stages + // The add timer should be canceled only if all tasks in both stages start running + sc.listenerBus.postToAll(SparkListenerStageSubmitted(createStageInfo(1, numTasks))) + sc.listenerBus.postToAll(SparkListenerStageSubmitted(createStageInfo(2, numTasks))) + assert(addTime(manager) !== NOT_SET) + taskInfos.foreach { info => sc.listenerBus.postToAll(SparkListenerTaskStart(1, 0, info)) } + assert(addTime(manager) !== NOT_SET) + taskInfos.foreach { info => sc.listenerBus.postToAll(SparkListenerTaskStart(2, 0, info)) } + assert(addTime(manager) === NOT_SET) + } + + test("listeners trigger remove executors correctly") { + val sc = createSparkContext(2, 10) + val manager = sc.executorAllocationManager.get + assert(removeTimes(manager).isEmpty) + + // Added executors should start the remove timers for each executor + (1 to 5).map("executor-" + _).foreach { id => onExecutorAdded(manager, id) } + assert(removeTimes(manager).size === 5) + + // Starting a task cancel the remove timer for that executor + sc.listenerBus.postToAll(SparkListenerTaskStart(0, 0, createTaskInfo(0, 0, "executor-1"))) + sc.listenerBus.postToAll(SparkListenerTaskStart(0, 0, createTaskInfo(1, 1, "executor-1"))) + sc.listenerBus.postToAll(SparkListenerTaskStart(0, 0, createTaskInfo(2, 2, "executor-2"))) + assert(removeTimes(manager).size === 3) + assert(!removeTimes(manager).contains("executor-1")) + assert(!removeTimes(manager).contains("executor-2")) + + // Finishing all tasks running on an executor should start the remove timer for that executor + sc.listenerBus.postToAll(SparkListenerTaskEnd( + 0, 0, "task-type", Success, createTaskInfo(0, 0, "executor-1"), new TaskMetrics)) + sc.listenerBus.postToAll(SparkListenerTaskEnd( + 0, 0, "task-type", Success, createTaskInfo(2, 2, "executor-2"), new TaskMetrics)) + assert(removeTimes(manager).size === 4) + assert(!removeTimes(manager).contains("executor-1")) // executor-1 has not finished yet + assert(removeTimes(manager).contains("executor-2")) + sc.listenerBus.postToAll(SparkListenerTaskEnd( + 0, 0, "task-type", Success, createTaskInfo(1, 1, "executor-1"), new TaskMetrics)) + assert(removeTimes(manager).size === 5) + assert(removeTimes(manager).contains("executor-1")) // executor-1 has now finished + } + + test("listeners trigger add and remove executor callbacks correctly") { + val sc = createSparkContext(2, 10) + val manager = sc.executorAllocationManager.get + assert(executorIds(manager).isEmpty) + assert(removeTimes(manager).isEmpty) + + // New executors have registered + sc.listenerBus.postToAll(SparkListenerBlockManagerAdded( + 0L, BlockManagerId("executor-1", "host1", 1), 100L)) + assert(executorIds(manager).size === 1) + assert(executorIds(manager).contains("executor-1")) + assert(removeTimes(manager).size === 1) + assert(removeTimes(manager).contains("executor-1")) + sc.listenerBus.postToAll(SparkListenerBlockManagerAdded( + 0L, BlockManagerId("executor-2", "host2", 1), 100L)) + assert(executorIds(manager).size === 2) + assert(executorIds(manager).contains("executor-2")) + assert(removeTimes(manager).size === 2) + assert(removeTimes(manager).contains("executor-2")) + + // Existing executors have disconnected + sc.listenerBus.postToAll(SparkListenerBlockManagerRemoved( + 0L, BlockManagerId("executor-1", "host1", 1))) + assert(executorIds(manager).size === 1) + assert(!executorIds(manager).contains("executor-1")) + assert(removeTimes(manager).size === 1) + assert(!removeTimes(manager).contains("executor-1")) + + // Unknown executor has disconnected + sc.listenerBus.postToAll(SparkListenerBlockManagerRemoved( + 0L, BlockManagerId("executor-3", "host3", 1))) + assert(executorIds(manager).size === 1) + assert(removeTimes(manager).size === 1) + } + +} + +/** + * Helper methods for testing ExecutorAllocationManager. + * This includes methods to access private methods and fields in ExecutorAllocationManager. + */ +private object ExecutorAllocationManagerSuite extends PrivateMethodTester { + private val schedulerBacklogTimeout = 1L + private val sustainedSchedulerBacklogTimeout = 2L + private val executorIdleTimeout = 3L + + private def createSparkContext(minExecutors: Int = 1, maxExecutors: Int = 5): SparkContext = { + val conf = new SparkConf() + .setMaster("local") + .setAppName("test-executor-allocation-manager") + .set("spark.dynamicAllocation.enabled", "true") + .set("spark.dynamicAllocation.minExecutors", minExecutors.toString) + .set("spark.dynamicAllocation.maxExecutors", maxExecutors.toString) + .set("spark.dynamicAllocation.schedulerBacklogTimeout", schedulerBacklogTimeout.toString) + .set("spark.dynamicAllocation.sustainedSchedulerBacklogTimeout", + sustainedSchedulerBacklogTimeout.toString) + .set("spark.dynamicAllocation.executorIdleTimeout", executorIdleTimeout.toString) + .set("spark.dynamicAllocation.testing", "true") + new SparkContext(conf) + } + + private def createStageInfo(stageId: Int, numTasks: Int): StageInfo = { + new StageInfo(stageId, 0, "name", numTasks, Seq.empty, "no details") + } + + private def createTaskInfo(taskId: Int, taskIndex: Int, executorId: String): TaskInfo = { + new TaskInfo(taskId, taskIndex, 0, 0, executorId, "", TaskLocality.ANY, speculative = false) + } + + /* ------------------------------------------------------- * + | Helper methods for accessing private methods and fields | + * ------------------------------------------------------- */ + + private val _numExecutorsToAdd = PrivateMethod[Int]('numExecutorsToAdd) + private val _numExecutorsPending = PrivateMethod[Int]('numExecutorsPending) + private val _executorsPendingToRemove = + PrivateMethod[collection.Set[String]]('executorsPendingToRemove) + private val _executorIds = PrivateMethod[collection.Set[String]]('executorIds) + private val _addTime = PrivateMethod[Long]('addTime) + private val _removeTimes = PrivateMethod[collection.Map[String, Long]]('removeTimes) + private val _schedule = PrivateMethod[Unit]('schedule) + private val _addExecutors = PrivateMethod[Int]('addExecutors) + private val _removeExecutor = PrivateMethod[Boolean]('removeExecutor) + private val _onExecutorAdded = PrivateMethod[Unit]('onExecutorAdded) + private val _onExecutorRemoved = PrivateMethod[Unit]('onExecutorRemoved) + private val _onSchedulerBacklogged = PrivateMethod[Unit]('onSchedulerBacklogged) + private val _onSchedulerQueueEmpty = PrivateMethod[Unit]('onSchedulerQueueEmpty) + private val _onExecutorIdle = PrivateMethod[Unit]('onExecutorIdle) + private val _onExecutorBusy = PrivateMethod[Unit]('onExecutorBusy) + + private def numExecutorsToAdd(manager: ExecutorAllocationManager): Int = { + manager invokePrivate _numExecutorsToAdd() + } + + private def numExecutorsPending(manager: ExecutorAllocationManager): Int = { + manager invokePrivate _numExecutorsPending() + } + + private def executorsPendingToRemove( + manager: ExecutorAllocationManager): collection.Set[String] = { + manager invokePrivate _executorsPendingToRemove() + } + + private def executorIds(manager: ExecutorAllocationManager): collection.Set[String] = { + manager invokePrivate _executorIds() + } + + private def addTime(manager: ExecutorAllocationManager): Long = { + manager invokePrivate _addTime() + } + + private def removeTimes(manager: ExecutorAllocationManager): collection.Map[String, Long] = { + manager invokePrivate _removeTimes() + } + + private def schedule(manager: ExecutorAllocationManager): Unit = { + manager invokePrivate _schedule() + } + + private def addExecutors(manager: ExecutorAllocationManager): Int = { + manager invokePrivate _addExecutors() + } + + private def removeExecutor(manager: ExecutorAllocationManager, id: String): Boolean = { + manager invokePrivate _removeExecutor(id) + } + + private def onExecutorAdded(manager: ExecutorAllocationManager, id: String): Unit = { + manager invokePrivate _onExecutorAdded(id) + } + + private def onExecutorRemoved(manager: ExecutorAllocationManager, id: String): Unit = { + manager invokePrivate _onExecutorRemoved(id) + } + + private def onSchedulerBacklogged(manager: ExecutorAllocationManager): Unit = { + manager invokePrivate _onSchedulerBacklogged() + } + + private def onSchedulerQueueEmpty(manager: ExecutorAllocationManager): Unit = { + manager invokePrivate _onSchedulerQueueEmpty() + } + + private def onExecutorIdle(manager: ExecutorAllocationManager, id: String): Unit = { + manager invokePrivate _onExecutorIdle(id) + } + + private def onExecutorBusy(manager: ExecutorAllocationManager, id: String): Unit = { + manager invokePrivate _onExecutorBusy(id) + } +} http://git-wip-us.apache.org/repos/asf/spark/blob/8d59b37b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala ---------------------------------------------------------------------- diff --git a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala index 6807379..e90672c 100644 --- a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala +++ b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala @@ -505,7 +505,7 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments, driver ! x case RequestExecutors(requestedTotal) => - logInfo(s"Driver requested a total number of executors of $requestedTotal.") + logInfo(s"Driver requested a total number of $requestedTotal executor(s).") Option(allocator) match { case Some(a) => a.requestTotalExecutors(requestedTotal) case None => logWarning("Container allocator is not ready to request executors yet.") --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org For additional commands, e-mail: commits-help@spark.apache.org