spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From andrewo...@apache.org
Subject spark git commit: [SPARK-9731] Standalone scheduling incorrect cores if spark.executor.cores is not set
Date Sat, 08 Aug 2015 06:36:31 GMT
Repository: spark
Updated Branches:
  refs/heads/master c564b2744 -> ef062c159


[SPARK-9731] Standalone scheduling incorrect cores if spark.executor.cores is not set

The issue only happens if `spark.executor.cores` is not set and executor memory is set to
a high value.
For example, if we have a worker with 4G and 10 cores and we set `spark.executor.memory` to
3G, then only 1 core is assigned to the executor. The correct number should be 10 cores.
I've added a unit test to illustrate the issue.

Author: Carson Wang <carson.wang@intel.com>

Closes #8017 from carsonwang/SPARK-9731 and squashes the following commits:

d09ec48 [Carson Wang] Fix code style
86b651f [Carson Wang] Simplify the code
943cc4c [Carson Wang] fix scheduling correct cores to executors


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/ef062c15
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/ef062c15
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/ef062c15

Branch: refs/heads/master
Commit: ef062c15992b0d08554495b8ea837bef3fabf6e9
Parents: c564b27
Author: Carson Wang <carson.wang@intel.com>
Authored: Fri Aug 7 23:36:26 2015 -0700
Committer: Andrew Or <andrew@databricks.com>
Committed: Fri Aug 7 23:36:26 2015 -0700

----------------------------------------------------------------------
 .../org/apache/spark/deploy/master/Master.scala | 26 +++++++++++---------
 .../spark/deploy/master/MasterSuite.scala       | 15 +++++++++++
 2 files changed, 29 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/ef062c15/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
index e38e437..9217202 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
@@ -581,20 +581,22 @@ private[deploy] class Master(
 
     /** Return whether the specified worker can launch an executor for this app. */
     def canLaunchExecutor(pos: Int): Boolean = {
+      val keepScheduling = coresToAssign >= minCoresPerExecutor
+      val enoughCores = usableWorkers(pos).coresFree - assignedCores(pos) >= minCoresPerExecutor
+
       // If we allow multiple executors per worker, then we can always launch new executors.
-      // Otherwise, we may have already started assigning cores to the executor on this worker.
+      // Otherwise, if there is already an executor on this worker, just give it more cores.
       val launchingNewExecutor = !oneExecutorPerWorker || assignedExecutors(pos) == 0
-      val underLimit =
-        if (launchingNewExecutor) {
-          assignedExecutors.sum + app.executors.size < app.executorLimit
-        } else {
-          true
-        }
-      val assignedMemory = assignedExecutors(pos) * memoryPerExecutor
-      usableWorkers(pos).memoryFree - assignedMemory >= memoryPerExecutor &&
-      usableWorkers(pos).coresFree - assignedCores(pos) >= minCoresPerExecutor &&
-      coresToAssign >= minCoresPerExecutor &&
-      underLimit
+      if (launchingNewExecutor) {
+        val assignedMemory = assignedExecutors(pos) * memoryPerExecutor
+        val enoughMemory = usableWorkers(pos).memoryFree - assignedMemory >= memoryPerExecutor
+        val underLimit = assignedExecutors.sum + app.executors.size < app.executorLimit
+        keepScheduling && enoughCores && enoughMemory && underLimit
+      } else {
+        // We're adding cores to an existing executor, so no need
+        // to check memory and executor limits
+        keepScheduling && enoughCores
+      }
     }
 
     // Keep launching executors until no more workers can accommodate any

http://git-wip-us.apache.org/repos/asf/spark/blob/ef062c15/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala b/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala
index ae0e037..20d0201 100644
--- a/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala
@@ -151,6 +151,14 @@ class MasterSuite extends SparkFunSuite with Matchers with Eventually
with Priva
     basicScheduling(spreadOut = false)
   }
 
+  test("basic scheduling with more memory - spread out") {
+    basicSchedulingWithMoreMemory(spreadOut = true)
+  }
+
+  test("basic scheduling with more memory - no spread out") {
+    basicSchedulingWithMoreMemory(spreadOut = false)
+  }
+
   test("scheduling with max cores - spread out") {
     schedulingWithMaxCores(spreadOut = true)
   }
@@ -214,6 +222,13 @@ class MasterSuite extends SparkFunSuite with Matchers with Eventually
with Priva
     assert(scheduledCores === Array(10, 10, 10))
   }
 
+  private def basicSchedulingWithMoreMemory(spreadOut: Boolean): Unit = {
+    val master = makeMaster()
+    val appInfo = makeAppInfo(3072)
+    val scheduledCores = scheduleExecutorsOnWorkers(master, appInfo, workerInfos, spreadOut)
+    assert(scheduledCores === Array(10, 10, 10))
+  }
+
   private def schedulingWithMaxCores(spreadOut: Boolean): Unit = {
     val master = makeMaster()
     val appInfo1 = makeAppInfo(1024, maxCores = Some(8))


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message