spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From andrewo...@apache.org
Subject spark git commit: [SPARK-7771] [SPARK-7779] Dynamic allocation: lower default timeouts further
Date Sat, 23 May 2015 00:38:12 GMT
Repository: spark
Updated Branches:
  refs/heads/branch-1.4 d7660dc2f -> 0be6e3b3e


[SPARK-7771] [SPARK-7779] Dynamic allocation: lower default timeouts further

The default add time of 5s is still too slow for small jobs. Also, the current default remove
time of 10 minutes seem rather high. This patch lowers both and rephrases a few log messages.

Author: Andrew Or <andrew@databricks.com>

Closes #6301 from andrewor14/da-minor and squashes the following commits:

6d614a6 [Andrew Or] Lower log level
2811492 [Andrew Or] Log information when requests are canceled
5fcd3eb [Andrew Or] Fix tests
3320710 [Andrew Or] Lower timeouts + rephrase a few log messages

(cherry picked from commit 3d8760d76eae41dcaab8e9aeda19619f3d5f1596)
Signed-off-by: Andrew Or <andrew@databricks.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/0be6e3b3
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/0be6e3b3
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/0be6e3b3

Branch: refs/heads/branch-1.4
Commit: 0be6e3b3e60768012e2337d1cbf2967275007a11
Parents: d7660dc
Author: Andrew Or <andrew@databricks.com>
Authored: Fri May 22 17:37:38 2015 -0700
Committer: Andrew Or <andrew@databricks.com>
Committed: Fri May 22 17:38:09 2015 -0700

----------------------------------------------------------------------
 .../spark/ExecutorAllocationManager.scala       | 26 ++++++++++++++------
 docs/configuration.md                           |  4 +--
 2 files changed, 20 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/0be6e3b3/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
index 66bda68..9514604 100644
--- a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
+++ b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
@@ -91,7 +91,7 @@ private[spark] class ExecutorAllocationManager(
 
   // How long there must be backlogged tasks for before an addition is triggered (seconds)
   private val schedulerBacklogTimeoutS = conf.getTimeAsSeconds(
-    "spark.dynamicAllocation.schedulerBacklogTimeout", "5s")
+    "spark.dynamicAllocation.schedulerBacklogTimeout", "1s")
 
   // Same as above, but used only after `schedulerBacklogTimeoutS` is exceeded
   private val sustainedSchedulerBacklogTimeoutS = conf.getTimeAsSeconds(
@@ -99,7 +99,7 @@ private[spark] class ExecutorAllocationManager(
 
   // How long an executor must be idle for before it is removed (seconds)
   private val executorIdleTimeoutS = conf.getTimeAsSeconds(
-    "spark.dynamicAllocation.executorIdleTimeout", "600s")
+    "spark.dynamicAllocation.executorIdleTimeout", "60s")
 
   // During testing, the methods to actually kill and add executors are mocked out
   private val testing = conf.getBoolean("spark.dynamicAllocation.testing", false)
@@ -268,6 +268,8 @@ private[spark] class ExecutorAllocationManager(
       numExecutorsTarget = math.max(maxNeeded, minNumExecutors)
       client.requestTotalExecutors(numExecutorsTarget)
       numExecutorsToAdd = 1
+      logInfo(s"Lowering target number of executors to $numExecutorsTarget because " +
+        s"not all requests are actually needed (previously $oldNumExecutorsTarget)")
       numExecutorsTarget - oldNumExecutorsTarget
     } else if (addTime != NOT_SET && now >= addTime) {
       val delta = addExecutors(maxNeeded)
@@ -292,9 +294,8 @@ private[spark] class ExecutorAllocationManager(
   private def addExecutors(maxNumExecutorsNeeded: Int): Int = {
     // Do not request more executors if it would put our target over the upper bound
     if (numExecutorsTarget >= maxNumExecutors) {
-      val numExecutorsPending = numExecutorsTarget - executorIds.size
-      logDebug(s"Not adding executors because there are already ${executorIds.size} registered
" +
-        s"and ${numExecutorsPending} pending executor(s) (limit $maxNumExecutors)")
+      logDebug(s"Not adding executors because our current target total " +
+        s"is already $numExecutorsTarget (limit $maxNumExecutors)")
       numExecutorsToAdd = 1
       return 0
     }
@@ -310,10 +311,19 @@ private[spark] class ExecutorAllocationManager(
     // Ensure that our target fits within configured bounds:
     numExecutorsTarget = math.max(math.min(numExecutorsTarget, maxNumExecutors), minNumExecutors)
 
+    val delta = numExecutorsTarget - oldNumExecutorsTarget
+
+    // If our target has not changed, do not send a message
+    // to the cluster manager and reset our exponential growth
+    if (delta == 0) {
+      numExecutorsToAdd = 1
+      return 0
+    }
+
     val addRequestAcknowledged = testing || client.requestTotalExecutors(numExecutorsTarget)
     if (addRequestAcknowledged) {
-      val delta = numExecutorsTarget - oldNumExecutorsTarget
-      logInfo(s"Requesting $delta new executor(s) because tasks are backlogged" +
+      val executorsString = "executor" + { if (delta > 1) "s" else "" }
+      logInfo(s"Requesting $delta new $executorsString because tasks are backlogged" +
         s" (new desired total will be $numExecutorsTarget)")
       numExecutorsToAdd = if (delta == numExecutorsToAdd) {
         numExecutorsToAdd * 2
@@ -420,7 +430,7 @@ private[spark] class ExecutorAllocationManager(
    * This resets all variables used for adding executors.
    */
   private def onSchedulerQueueEmpty(): Unit = synchronized {
-    logDebug(s"Clearing timer to add executors because there are no more pending tasks")
+    logDebug("Clearing timer to add executors because there are no more pending tasks")
     addTime = NOT_SET
     numExecutorsToAdd = 1
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/0be6e3b3/docs/configuration.md
----------------------------------------------------------------------
diff --git a/docs/configuration.md b/docs/configuration.md
index 0de8245..30508a6 100644
--- a/docs/configuration.md
+++ b/docs/configuration.md
@@ -1194,7 +1194,7 @@ Apart from these, the following properties are also available, and may
be useful
 </tr>
 <tr>
   <td><code>spark.dynamicAllocation.executorIdleTimeout</code></td>
-  <td>600s</td>
+  <td>60s</td>
   <td>
     If dynamic allocation is enabled and an executor has been idle for more than this duration,

     the executor will be removed. For more detail, see this
@@ -1224,7 +1224,7 @@ Apart from these, the following properties are also available, and may
be useful
 </tr>
 <tr>
   <td><code>spark.dynamicAllocation.schedulerBacklogTimeout</code></td>
-  <td>5s</td>
+  <td>1s</td>
   <td>
     If dynamic allocation is enabled and there have been pending tasks backlogged for more
than
     this duration, new executors will be requested. For more detail, see this


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message