spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From van...@apache.org
Subject spark git commit: [SPARK-10790] [YARN] Fix initial executor number not set issue and consolidate the codes
Date Mon, 28 Sep 2015 13:39:06 GMT
Repository: spark
Updated Branches:
  refs/heads/master d8d50ed38 -> 353c30bd7


[SPARK-10790] [YARN] Fix initial executor number not set issue and consolidate the codes

This bug is introduced in [SPARK-9092](https://issues.apache.org/jira/browse/SPARK-9092),
`targetExecutorNumber` should use `minExecutors` if `initialExecutors` is not set. Using 0
instead will meet the problem as mentioned in [SPARK-10790](https://issues.apache.org/jira/browse/SPARK-10790).

Also consolidate and simplify some similar code snippets to keep the consistent semantics.

Author: jerryshao <sshao@hortonworks.com>

Closes #8910 from jerryshao/SPARK-10790.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/353c30bd
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/353c30bd
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/353c30bd

Branch: refs/heads/master
Commit: 353c30bd7dfbd3b76fc8bc9a6dfab9321439a34b
Parents: d8d50ed
Author: jerryshao <sshao@hortonworks.com>
Authored: Mon Sep 28 06:38:54 2015 -0700
Committer: Marcelo Vanzin <vanzin@cloudera.com>
Committed: Mon Sep 28 06:38:54 2015 -0700

----------------------------------------------------------------------
 .../spark/deploy/yarn/ClientArguments.scala     | 20 +----------------
 .../spark/deploy/yarn/YarnAllocator.scala       |  6 +----
 .../spark/deploy/yarn/YarnSparkHadoopUtil.scala | 23 ++++++++++++++++++++
 .../cluster/YarnClusterSchedulerBackend.scala   | 18 ++-------------
 4 files changed, 27 insertions(+), 40 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/353c30bd/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala
----------------------------------------------------------------------
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala
index 54f62e6..1165061 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala
@@ -81,25 +81,7 @@ private[spark] class ClientArguments(args: Array[String], sparkConf: SparkConf)
       .orNull
     // If dynamic allocation is enabled, start at the configured initial number of executors.
     // Default to minExecutors if no initialExecutors is set.
-    if (isDynamicAllocationEnabled) {
-      val minExecutorsConf = "spark.dynamicAllocation.minExecutors"
-      val initialExecutorsConf = "spark.dynamicAllocation.initialExecutors"
-      val maxExecutorsConf = "spark.dynamicAllocation.maxExecutors"
-      val minNumExecutors = sparkConf.getInt(minExecutorsConf, 0)
-      val initialNumExecutors = sparkConf.getInt(initialExecutorsConf, minNumExecutors)
-      val maxNumExecutors = sparkConf.getInt(maxExecutorsConf, Integer.MAX_VALUE)
-
-      // If defined, initial executors must be between min and max
-      if (initialNumExecutors < minNumExecutors || initialNumExecutors > maxNumExecutors)
{
-        throw new IllegalArgumentException(
-          s"$initialExecutorsConf must be between $minExecutorsConf and $maxNumExecutors!")
-      }
-
-      numExecutors = initialNumExecutors
-    } else {
-      val numExecutorsConf = "spark.executor.instances"
-      numExecutors = sparkConf.getInt(numExecutorsConf, numExecutors)
-    }
+    numExecutors = YarnSparkHadoopUtil.getInitialTargetExecutorNumber(sparkConf)
     principal = Option(principal)
       .orElse(sparkConf.getOption("spark.yarn.principal"))
       .orNull

http://git-wip-us.apache.org/repos/asf/spark/blob/353c30bd/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
----------------------------------------------------------------------
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
index fd88b8b..9e1ef1b 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
@@ -89,11 +89,7 @@ private[yarn] class YarnAllocator(
   @volatile private var numExecutorsFailed = 0
 
   @volatile private var targetNumExecutors =
-    if (Utils.isDynamicAllocationEnabled(sparkConf)) {
-      sparkConf.getInt("spark.dynamicAllocation.initialExecutors", 0)
-    } else {
-      sparkConf.getInt("spark.executor.instances", YarnSparkHadoopUtil.DEFAULT_NUMBER_EXECUTORS)
-    }
+    YarnSparkHadoopUtil.getInitialTargetExecutorNumber(sparkConf)
 
   // Executor loss reason requests that are pending - maps from executor ID for inquiry to
a
   // list of requesters that should be responded to once we find out why the given executor

http://git-wip-us.apache.org/repos/asf/spark/blob/353c30bd/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala
----------------------------------------------------------------------
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala
index 445d3dc..f276e7e 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala
@@ -314,5 +314,28 @@ object YarnSparkHadoopUtil {
   def getClassPathSeparator(): String = {
     classPathSeparatorField.get(null).asInstanceOf[String]
   }
+
+  /**
+   * Getting the initial target number of executors depends on whether dynamic allocation
is
+   * enabled.
+   */
+  def getInitialTargetExecutorNumber(conf: SparkConf): Int = {
+    if (Utils.isDynamicAllocationEnabled(conf)) {
+      val minNumExecutors = conf.getInt("spark.dynamicAllocation.minExecutors", 0)
+      val initialNumExecutors =
+        conf.getInt("spark.dynamicAllocation.initialExecutors", minNumExecutors)
+      val maxNumExecutors = conf.getInt("spark.dynamicAllocation.maxExecutors", Int.MaxValue)
+      require(initialNumExecutors >= minNumExecutors && initialNumExecutors <=
maxNumExecutors,
+        s"initial executor number $initialNumExecutors must between min executor number"
+
+          s"$minNumExecutors and max executor number $maxNumExecutors")
+
+      initialNumExecutors
+    } else {
+      val targetNumExecutors =
+        sys.env.get("SPARK_EXECUTOR_INSTANCES").map(_.toInt).getOrElse(DEFAULT_NUMBER_EXECUTORS)
+      // System property can override environment variable.
+      conf.getInt("spark.executor.instances", targetNumExecutors)
+    }
+  }
 }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/353c30bd/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala
----------------------------------------------------------------------
diff --git a/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala
b/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala
index 1aed5a1..50b699f 100644
--- a/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala
+++ b/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala
@@ -17,21 +17,13 @@
 
 package org.apache.spark.scheduler.cluster
 
-import java.net.NetworkInterface
-
 import org.apache.hadoop.yarn.api.ApplicationConstants.Environment
-
-import scala.collection.JavaConverters._
-
-import org.apache.hadoop.yarn.api.records.NodeState
-import org.apache.hadoop.yarn.client.api.YarnClient
 import org.apache.hadoop.yarn.conf.YarnConfiguration
 
 import org.apache.spark.SparkContext
 import org.apache.spark.deploy.yarn.YarnSparkHadoopUtil
-import org.apache.spark.deploy.yarn.YarnSparkHadoopUtil._
 import org.apache.spark.scheduler.TaskSchedulerImpl
-import org.apache.spark.util.{IntParam, Utils}
+import org.apache.spark.util.Utils
 
 private[spark] class YarnClusterSchedulerBackend(
     scheduler: TaskSchedulerImpl,
@@ -40,13 +32,7 @@ private[spark] class YarnClusterSchedulerBackend(
 
   override def start() {
     super.start()
-    totalExpectedExecutors = DEFAULT_NUMBER_EXECUTORS
-    if (System.getenv("SPARK_EXECUTOR_INSTANCES") != null) {
-      totalExpectedExecutors = IntParam.unapply(System.getenv("SPARK_EXECUTOR_INSTANCES"))
-        .getOrElse(totalExpectedExecutors)
-    }
-    // System property can override environment variable.
-    totalExpectedExecutors = sc.getConf.getInt("spark.executor.instances", totalExpectedExecutors)
+    totalExpectedExecutors = YarnSparkHadoopUtil.getInitialTargetExecutorNumber(sc.conf)
   }
 
   override def applicationId(): String =


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message