spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From andrewo...@apache.org
Subject spark git commit: SPARK-4214. With dynamic allocation, avoid outstanding requests for more...
Date Fri, 14 Nov 2014 23:51:45 GMT
Repository: spark
Updated Branches:
  refs/heads/master 37482ce5a -> ad42b2832


SPARK-4214. With dynamic allocation, avoid outstanding requests for more...

... executors than pending tasks need.

WIP. Still need to add and fix tests.

Author: Sandy Ryza <sandy@cloudera.com>

Closes #3204 from sryza/sandy-spark-4214 and squashes the following commits:

35cf0e0 [Sandy Ryza] Add comment
13b53df [Sandy Ryza] Review feedback
067465f [Sandy Ryza] Whitespace fix
6ae080c [Sandy Ryza] Add tests and get num pending tasks from ExecutorAllocationListener
531e2b6 [Sandy Ryza] SPARK-4214. With dynamic allocation, avoid outstanding requests for more
executors than pending tasks need.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/ad42b283
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/ad42b283
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/ad42b283

Branch: refs/heads/master
Commit: ad42b283246b93654c5fd731cd618fee74d8c4da
Parents: 37482ce
Author: Sandy Ryza <sandy@cloudera.com>
Authored: Fri Nov 14 15:51:05 2014 -0800
Committer: Andrew Or <andrew@databricks.com>
Committed: Fri Nov 14 15:51:40 2014 -0800

----------------------------------------------------------------------
 .../spark/ExecutorAllocationManager.scala       | 55 ++++++++++++++++----
 .../spark/ExecutorAllocationManagerSuite.scala  | 48 +++++++++++++++++
 2 files changed, 94 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/ad42b283/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
index ef93009..88adb89 100644
--- a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
+++ b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
@@ -28,7 +28,9 @@ import org.apache.spark.scheduler._
  * the scheduler queue is not drained in N seconds, then new executors are added. If the
queue
  * persists for another M seconds, then more executors are added and so on. The number added
  * in each round increases exponentially from the previous round until an upper bound on
the
- * number of executors has been reached.
+ * number of executors has been reached. The upper bound is based both on a configured property
+ * and on the number of tasks pending: the policy will never increase the number of executor
+ * requests past the number needed to handle all pending tasks.
  *
  * The rationale for the exponential increase is twofold: (1) Executors should be added slowly
  * in the beginning in case the number of extra executors needed turns out to be small. Otherwise,
@@ -82,6 +84,12 @@ private[spark] class ExecutorAllocationManager(sc: SparkContext) extends
Logging
   // During testing, the methods to actually kill and add executors are mocked out
   private val testing = conf.getBoolean("spark.dynamicAllocation.testing", false)
 
+  // TODO: The default value of 1 for spark.executor.cores works right now because dynamic
+  // allocation is only supported for YARN and the default number of cores per executor in
YARN is
+  // 1, but it might need to be attained differently for different cluster managers
+  private val tasksPerExecutor =
+    conf.getInt("spark.executor.cores", 1) / conf.getInt("spark.task.cpus", 1)
+
   validateSettings()
 
   // Number of executors to add in the next round
@@ -110,6 +118,9 @@ private[spark] class ExecutorAllocationManager(sc: SparkContext) extends
Logging
   // Clock used to schedule when executors should be added and removed
   private var clock: Clock = new RealClock
 
+  // Listener for Spark events that impact the allocation policy
+  private val listener = new ExecutorAllocationListener(this)
+
   /**
    * Verify that the settings specified through the config are valid.
    * If not, throw an appropriate exception.
@@ -141,6 +152,9 @@ private[spark] class ExecutorAllocationManager(sc: SparkContext) extends
Logging
       throw new SparkException("Dynamic allocation of executors requires the external " +
         "shuffle service. You may enable this through spark.shuffle.service.enabled.")
     }
+    if (tasksPerExecutor == 0) {
+      throw new SparkException("spark.executor.cores must not be less than spark.task.cpus.cores")
+    }
   }
 
   /**
@@ -154,7 +168,6 @@ private[spark] class ExecutorAllocationManager(sc: SparkContext) extends
Logging
    * Register for scheduler callbacks to decide when to add and remove executors.
    */
   def start(): Unit = {
-    val listener = new ExecutorAllocationListener(this)
     sc.addSparkListener(listener)
     startPolling()
   }
@@ -218,13 +231,27 @@ private[spark] class ExecutorAllocationManager(sc: SparkContext) extends
Logging
       return 0
     }
 
-    // Request executors with respect to the upper bound
-    val actualNumExecutorsToAdd =
-      if (numExistingExecutors + numExecutorsToAdd <= maxNumExecutors) {
-        numExecutorsToAdd
-      } else {
-        maxNumExecutors - numExistingExecutors
-      }
+    // The number of executors needed to satisfy all pending tasks is the number of tasks
pending
+    // divided by the number of tasks each executor can fit, rounded up.
+    val maxNumExecutorsPending =
+      (listener.totalPendingTasks() + tasksPerExecutor - 1) / tasksPerExecutor
+    if (numExecutorsPending >= maxNumExecutorsPending) {
+      logDebug(s"Not adding executors because there are already $numExecutorsPending " +
+        s"pending and pending tasks could only fill $maxNumExecutorsPending")
+      numExecutorsToAdd = 1
+      return 0
+    }
+
+    // It's never useful to request more executors than could satisfy all the pending tasks,
so
+    // cap request at that amount.
+    // Also cap request with respect to the configured upper bound.
+    val maxNumExecutorsToAdd = math.min(
+      maxNumExecutorsPending - numExecutorsPending,
+      maxNumExecutors - numExistingExecutors)
+    assert(maxNumExecutorsToAdd > 0)
+
+    val actualNumExecutorsToAdd = math.min(numExecutorsToAdd, maxNumExecutorsToAdd)
+
     val newTotalExecutors = numExistingExecutors + actualNumExecutorsToAdd
     val addRequestAcknowledged = testing || sc.requestExecutors(actualNumExecutorsToAdd)
     if (addRequestAcknowledged) {
@@ -445,6 +472,16 @@ private[spark] class ExecutorAllocationManager(sc: SparkContext) extends
Logging
         blockManagerRemoved: SparkListenerBlockManagerRemoved): Unit = {
       allocationManager.onExecutorRemoved(blockManagerRemoved.blockManagerId.executorId)
     }
+
+    /**
+     * An estimate of the total number of pending tasks remaining for currently running stages.
Does
+     * not account for tasks which may have failed and been resubmitted.
+     */
+    def totalPendingTasks(): Int = {
+      stageIdToNumTasks.map { case (stageId, numTasks) =>
+        numTasks - stageIdToTaskIndices.get(stageId).map(_.size).getOrElse(0)
+      }.sum
+    }
   }
 
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/ad42b283/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala b/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala
index 66cf60d..4b27477 100644
--- a/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala
@@ -76,6 +76,7 @@ class ExecutorAllocationManagerSuite extends FunSuite with LocalSparkContext
{
   test("add executors") {
     sc = createSparkContext(1, 10)
     val manager = sc.executorAllocationManager.get
+    sc.listenerBus.postToAll(SparkListenerStageSubmitted(createStageInfo(0, 1000)))
 
     // Keep adding until the limit is reached
     assert(numExecutorsPending(manager) === 0)
@@ -117,6 +118,51 @@ class ExecutorAllocationManagerSuite extends FunSuite with LocalSparkContext
{
     assert(numExecutorsToAdd(manager) === 1)
   }
 
+  test("add executors capped by num pending tasks") {
+    sc = createSparkContext(1, 10)
+    val manager = sc.executorAllocationManager.get
+    sc.listenerBus.postToAll(SparkListenerStageSubmitted(createStageInfo(0, 5)))
+
+    // Verify that we're capped at number of tasks in the stage
+    assert(numExecutorsPending(manager) === 0)
+    assert(numExecutorsToAdd(manager) === 1)
+    assert(addExecutors(manager) === 1)
+    assert(numExecutorsPending(manager) === 1)
+    assert(numExecutorsToAdd(manager) === 2)
+    assert(addExecutors(manager) === 2)
+    assert(numExecutorsPending(manager) === 3)
+    assert(numExecutorsToAdd(manager) === 4)
+    assert(addExecutors(manager) === 2)
+    assert(numExecutorsPending(manager) === 5)
+    assert(numExecutorsToAdd(manager) === 1)
+
+    // Verify that running a task reduces the cap
+    sc.listenerBus.postToAll(SparkListenerStageSubmitted(createStageInfo(1, 3)))
+    sc.listenerBus.postToAll(SparkListenerTaskStart(1, 0, createTaskInfo(0, 0, "executor-1")))
+    assert(addExecutors(manager) === 1)
+    assert(numExecutorsPending(manager) === 6)
+    assert(numExecutorsToAdd(manager) === 2)
+    assert(addExecutors(manager) === 1)
+    assert(numExecutorsPending(manager) === 7)
+    assert(numExecutorsToAdd(manager) === 1)
+
+    // Verify that re-running a task doesn't reduce the cap further
+    sc.listenerBus.postToAll(SparkListenerStageSubmitted(createStageInfo(2, 3)))
+    sc.listenerBus.postToAll(SparkListenerTaskStart(2, 0, createTaskInfo(0, 0, "executor-1")))
+    sc.listenerBus.postToAll(SparkListenerTaskStart(2, 0, createTaskInfo(1, 0, "executor-1")))
+    assert(addExecutors(manager) === 1)
+    assert(numExecutorsPending(manager) === 8)
+    assert(numExecutorsToAdd(manager) === 2)
+    assert(addExecutors(manager) === 1)
+    assert(numExecutorsPending(manager) === 9)
+    assert(numExecutorsToAdd(manager) === 1)
+
+    // Verify that running a task once we're at our limit doesn't blow things up
+    sc.listenerBus.postToAll(SparkListenerTaskStart(2, 0, createTaskInfo(0, 1, "executor-1")))
+    assert(addExecutors(manager) === 0)
+    assert(numExecutorsPending(manager) === 9)
+  }
+
   test("remove executors") {
     sc = createSparkContext(5, 10)
     val manager = sc.executorAllocationManager.get
@@ -170,6 +216,7 @@ class ExecutorAllocationManagerSuite extends FunSuite with LocalSparkContext
{
   test ("interleaving add and remove") {
     sc = createSparkContext(5, 10)
     val manager = sc.executorAllocationManager.get
+    sc.listenerBus.postToAll(SparkListenerStageSubmitted(createStageInfo(0, 1000)))
 
     // Add a few executors
     assert(addExecutors(manager) === 1)
@@ -343,6 +390,7 @@ class ExecutorAllocationManagerSuite extends FunSuite with LocalSparkContext
{
     val clock = new TestClock(2020L)
     val manager = sc.executorAllocationManager.get
     manager.setClock(clock)
+    sc.listenerBus.postToAll(SparkListenerStageSubmitted(createStageInfo(0, 1000)))
 
     // Scheduler queue backlogged
     onSchedulerBacklogged(manager)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message