Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 4AD0D200B32 for ; Thu, 23 Jun 2016 21:03:56 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 49836160A59; Thu, 23 Jun 2016 19:03:56 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 44D28160A35 for ; Thu, 23 Jun 2016 21:03:55 +0200 (CEST) Received: (qmail 97884 invoked by uid 500); 23 Jun 2016 19:03:54 -0000 Mailing-List: contact commits-help@spark.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Delivered-To: mailing list commits@spark.apache.org Received: (qmail 97875 invoked by uid 99); 23 Jun 2016 19:03:54 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 23 Jun 2016 19:03:54 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 5B769E38B1; Thu, 23 Jun 2016 19:03:54 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: tgraves@apache.org To: commits@spark.apache.org Message-Id: <116feec8bf2447eaaf34ecbde90df01d@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: spark git commit: [SPARK-13723][YARN] Change behavior of --num-executors with dynamic allocation. Date: Thu, 23 Jun 2016 19:03:54 +0000 (UTC) archived-at: Thu, 23 Jun 2016 19:03:56 -0000 Repository: spark Updated Branches: refs/heads/master a410814c8 -> 738f134bf [SPARK-13723][YARN] Change behavior of --num-executors with dynamic allocation. ## What changes were proposed in this pull request? This changes the behavior of --num-executors and spark.executor.instances when using dynamic allocation. Instead of turning dynamic allocation off, it uses the value for the initial number of executors. This changes was discussed on [SPARK-13723](https://issues.apache.org/jira/browse/SPARK-13723). I highly recommend using it while we can change the behavior for 2.0.0. In practice, the 1.x behavior causes unexpected behavior for users (it is not clear that it disables dynamic allocation) and wastes cluster resources because users rarely notice the log message. ## How was this patch tested? This patch updates tests and adds a test for Utils.getDynamicAllocationInitialExecutors. Author: Ryan Blue Closes #13338 from rdblue/SPARK-13723-num-executors-with-dynamic-allocation. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/738f134b Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/738f134b Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/738f134b Branch: refs/heads/master Commit: 738f134bf4bf07bafb17e7066cf1a36e315872c2 Parents: a410814 Author: Ryan Blue Authored: Thu Jun 23 14:03:46 2016 -0500 Committer: Tom Graves Committed: Thu Jun 23 14:03:46 2016 -0500 ---------------------------------------------------------------------- .../spark/ExecutorAllocationManager.scala | 11 +++++----- .../spark/deploy/SparkSubmitArguments.scala | 2 ++ .../scala/org/apache/spark/util/Utils.scala | 22 ++++++++++++-------- .../org/apache/spark/util/UtilsSuite.scala | 18 +++++++++++++++- docs/configuration.md | 3 +++ docs/running-on-yarn.md | 2 +- .../spark/deploy/yarn/YarnSparkHadoopUtil.scala | 2 +- 7 files changed, 42 insertions(+), 18 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/738f134b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala index 0926d05..932ba16 100644 --- a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala +++ b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala @@ -25,9 +25,10 @@ import scala.util.control.ControlThrowable import com.codahale.metrics.{Gauge, MetricRegistry} import org.apache.spark.internal.Logging +import org.apache.spark.internal.config.{DYN_ALLOCATION_MAX_EXECUTORS, DYN_ALLOCATION_MIN_EXECUTORS} import org.apache.spark.metrics.source.Source import org.apache.spark.scheduler._ -import org.apache.spark.util.{Clock, SystemClock, ThreadUtils} +import org.apache.spark.util.{Clock, SystemClock, ThreadUtils, Utils} /** * An agent that dynamically allocates and removes executors based on the workload. @@ -87,11 +88,9 @@ private[spark] class ExecutorAllocationManager( import ExecutorAllocationManager._ // Lower and upper bounds on the number of executors. - private val minNumExecutors = conf.getInt("spark.dynamicAllocation.minExecutors", 0) - private val maxNumExecutors = conf.getInt("spark.dynamicAllocation.maxExecutors", - Integer.MAX_VALUE) - private val initialNumExecutors = conf.getInt("spark.dynamicAllocation.initialExecutors", - minNumExecutors) + private val minNumExecutors = conf.get(DYN_ALLOCATION_MIN_EXECUTORS) + private val maxNumExecutors = conf.get(DYN_ALLOCATION_MAX_EXECUTORS) + private val initialNumExecutors = Utils.getDynamicAllocationInitialExecutors(conf) // How long there must be backlogged tasks for before an addition is triggered (seconds) private val schedulerBacklogTimeoutS = conf.getTimeAsSeconds( http://git-wip-us.apache.org/repos/asf/spark/blob/738f134b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala index 206c130..f1761e7 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala @@ -550,6 +550,8 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S | (Default: 1). | --queue QUEUE_NAME The YARN queue to submit to (Default: "default"). | --num-executors NUM Number of executors to launch (Default: 2). + | If dynamic allocation is enabled, the initial number of + | executors will be at least NUM. | --archives ARCHIVES Comma separated list of archives to be extracted into the | working directory of each executor. | --principal PRINCIPAL Principal to be used to login to KDC, while running on http://git-wip-us.apache.org/repos/asf/spark/blob/738f134b/core/src/main/scala/org/apache/spark/util/Utils.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index 17d193b..f77cc2f 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -52,6 +52,7 @@ import org.slf4j.Logger import org.apache.spark._ import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.internal.Logging +import org.apache.spark.internal.config.{DYN_ALLOCATION_INITIAL_EXECUTORS, DYN_ALLOCATION_MIN_EXECUTORS, EXECUTOR_INSTANCES} import org.apache.spark.network.util.JavaUtils import org.apache.spark.serializer.{DeserializationStream, SerializationStream, SerializerInstance} @@ -2309,21 +2310,24 @@ private[spark] object Utils extends Logging { } /** - * Return whether dynamic allocation is enabled in the given conf - * Dynamic allocation and explicitly setting the number of executors are inherently - * incompatible. In environments where dynamic allocation is turned on by default, - * the latter should override the former (SPARK-9092). + * Return whether dynamic allocation is enabled in the given conf. */ def isDynamicAllocationEnabled(conf: SparkConf): Boolean = { - val numExecutor = conf.getInt("spark.executor.instances", 0) val dynamicAllocationEnabled = conf.getBoolean("spark.dynamicAllocation.enabled", false) - if (numExecutor != 0 && dynamicAllocationEnabled) { - logWarning("Dynamic Allocation and num executors both set, thus dynamic allocation disabled.") - } - numExecutor == 0 && dynamicAllocationEnabled && + dynamicAllocationEnabled && (!isLocalMaster(conf) || conf.getBoolean("spark.dynamicAllocation.testing", false)) } + /** + * Return the initial number of executors for dynamic allocation. + */ + def getDynamicAllocationInitialExecutors(conf: SparkConf): Int = { + Seq( + conf.get(DYN_ALLOCATION_MIN_EXECUTORS), + conf.get(DYN_ALLOCATION_INITIAL_EXECUTORS), + conf.get(EXECUTOR_INSTANCES).getOrElse(0)).max + } + def tryWithResource[R <: Closeable, T](createResource: => R)(f: R => T): T = { val resource = createResource try f.apply(resource) finally resource.close() http://git-wip-us.apache.org/repos/asf/spark/blob/738f134b/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala b/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala index a5363f0..e3a8e83 100644 --- a/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala @@ -761,13 +761,29 @@ class UtilsSuite extends SparkFunSuite with ResetSystemProperties with Logging { assert(Utils.isDynamicAllocationEnabled( conf.set("spark.dynamicAllocation.enabled", "true")) === true) assert(Utils.isDynamicAllocationEnabled( - conf.set("spark.executor.instances", "1")) === false) + conf.set("spark.executor.instances", "1")) === true) assert(Utils.isDynamicAllocationEnabled( conf.set("spark.executor.instances", "0")) === true) assert(Utils.isDynamicAllocationEnabled(conf.set("spark.master", "local")) === false) assert(Utils.isDynamicAllocationEnabled(conf.set("spark.dynamicAllocation.testing", "true"))) } + test("getDynamicAllocationInitialExecutors") { + val conf = new SparkConf() + assert(Utils.getDynamicAllocationInitialExecutors(conf) === 0) + assert(Utils.getDynamicAllocationInitialExecutors( + conf.set("spark.dynamicAllocation.minExecutors", "3")) === 3) + assert(Utils.getDynamicAllocationInitialExecutors( // should use minExecutors + conf.set("spark.executor.instances", "2")) === 3) + assert(Utils.getDynamicAllocationInitialExecutors( // should use executor.instances + conf.set("spark.executor.instances", "4")) === 4) + assert(Utils.getDynamicAllocationInitialExecutors( // should use executor.instances + conf.set("spark.dynamicAllocation.initialExecutors", "3")) === 4) + assert(Utils.getDynamicAllocationInitialExecutors( // should use initialExecutors + conf.set("spark.dynamicAllocation.initialExecutors", "5")) === 5) + } + + test("encodeFileNameToURIRawPath") { assert(Utils.encodeFileNameToURIRawPath("abc") === "abc") assert(Utils.encodeFileNameToURIRawPath("abc xyz") === "abc%20xyz") http://git-wip-us.apache.org/repos/asf/spark/blob/738f134b/docs/configuration.md ---------------------------------------------------------------------- diff --git a/docs/configuration.md b/docs/configuration.md index fbda91c..cee59cf 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -1236,6 +1236,9 @@ Apart from these, the following properties are also available, and may be useful spark.dynamicAllocation.minExecutors Initial number of executors to run if dynamic allocation is enabled. +

+ If `--num-executors` (or `spark.executor.instances`) is set and larger than this value, it will + be used as the initial number of executors. http://git-wip-us.apache.org/repos/asf/spark/blob/738f134b/docs/running-on-yarn.md ---------------------------------------------------------------------- diff --git a/docs/running-on-yarn.md b/docs/running-on-yarn.md index 9833806..dbd46cc 100644 --- a/docs/running-on-yarn.md +++ b/docs/running-on-yarn.md @@ -244,7 +244,7 @@ To use a custom metrics.properties for the application master and executors, upd spark.executor.instances 2 - The number of executors. Note that this property is incompatible with spark.dynamicAllocation.enabled. If both spark.dynamicAllocation.enabled and spark.executor.instances are specified, dynamic allocation is turned off and the specified number of spark.executor.instances is used. + The number of executors for static allocation. With spark.dynamicAllocation.enabled, the initial set of executors will be at least this large. http://git-wip-us.apache.org/repos/asf/spark/blob/738f134b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala ---------------------------------------------------------------------- diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala index de6cd94..156a7a3 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala @@ -520,7 +520,7 @@ object YarnSparkHadoopUtil { numExecutors: Int = DEFAULT_NUMBER_EXECUTORS): Int = { if (Utils.isDynamicAllocationEnabled(conf)) { val minNumExecutors = conf.get(DYN_ALLOCATION_MIN_EXECUTORS) - val initialNumExecutors = conf.get(DYN_ALLOCATION_INITIAL_EXECUTORS) + val initialNumExecutors = Utils.getDynamicAllocationInitialExecutors(conf) val maxNumExecutors = conf.get(DYN_ALLOCATION_MAX_EXECUTORS) require(initialNumExecutors >= minNumExecutors && initialNumExecutors <= maxNumExecutors, s"initial executor number $initialNumExecutors must between min executor number " + --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org For additional commands, e-mail: commits-help@spark.apache.org