spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From andrewo...@apache.org
Subject git commit: [SPARK-3822] Executor scaling mechanism for Yarn
Date Wed, 29 Oct 2014 21:01:22 GMT
Repository: spark
Updated Branches:
  refs/heads/master 353546766 -> 1df05a40e


[SPARK-3822] Executor scaling mechanism for Yarn

This is part of a broader effort to enable dynamic scaling of executors ([SPARK-3174](https://issues.apache.org/jira/browse/SPARK-3174)).
This is intended to work alongside SPARK-3795 (#2746), SPARK-3796 and SPARK-3797, but is functionally
independently of these other issues.

The logic is built on top of PraveenSeluka's changes at #2798. This is different from the
changes there in a few major ways: (1) the mechanism is implemented within the existing scheduler
backend framework rather than in new `Actor` classes. This also introduces a parent abstract
class `YarnSchedulerBackend` to encapsulate common logic to communicate with the Yarn `ApplicationMaster`.
(2) The interface of requesting executors exposed to the `SparkContext` is the same, but the
communication between the scheduler backend and the AM uses total number executors desired
instead of an incremental number. This is discussed in #2746 and explained in the comments
in the code.

I have tested this significantly on a stable Yarn cluster.

------------
A remaining task for this issue is to tone down the error messages emitted when an executor
is removed.
Currently, `SparkContext` and its components react as if the executor has failed, resulting
in many scary error messages and eventual timeouts. While it's not strictly necessary to fix
this as of the first-cut implementation of this mechanism, it would be good to add logic to
distinguish this case. I prefer to address this in a separate PR. I have filed a separate
JIRA for this task at SPARK-4134.

Author: Andrew Or <andrew@databricks.com>
Author: Andrew Or <andrewor14@gmail.com>

Closes #2840 from andrewor14/yarn-scaling-mechanism and squashes the following commits:

485863e [Andrew Or] Minor log message changes
4920be8 [Andrew Or] Clarify that public API is only for Yarn mode for now
1c57804 [Andrew Or] Reword a few comments + other review comments
6321140 [Andrew Or] Merge branch 'master' of github.com:apache/spark into yarn-scaling-mechanism
02836c0 [Andrew Or] Limit scope of synchronization
4e2ed7f [Andrew Or] Fix bug: keep track of removed executors properly
73ade46 [Andrew Or] Wording changes (minor)
2a7a6da [Andrew Or] Add `sc.killExecutor` as a shorthand (minor)
665f229 [Andrew Or] Mima excludes
79aa2df [Andrew Or] Simplify the request interface by asking for a total
04f625b [Andrew Or] Fix race condition that causes over-allocation of executors
f4783f8 [Andrew Or] Change the semantics of requesting executors
005a124 [Andrew Or] Fix tests
4628b16 [Andrew Or] Merge branch 'master' of github.com:apache/spark into yarn-scaling-mechanism
db4a679 [Andrew Or] Merge branch 'master' of github.com:apache/spark into yarn-scaling-mechanism
572f5c5 [Andrew Or] Unused import (minor)
f30261c [Andrew Or] Kill multiple executors rather than one at a time
de260d9 [Andrew Or] Simplify by skipping useless null check
9c52542 [Andrew Or] Simplify by skipping the TaskSchedulerImpl
97dd1a8 [Andrew Or] Merge branch 'master' of github.com:apache/spark into yarn-scaling-mechanism
d987b3e [Andrew Or] Move addWebUIFilters to Yarn scheduler backend
7b76d0a [Andrew Or] Expose mechanism in SparkContext as developer API
47466cd [Andrew Or] Refactor common Yarn scheduler backend logic
c4dfaac [Andrew Or] Avoid thrashing when removing executors
53e8145 [Andrew Or] Start yarn actor early to listen for AM registration message
bbee669 [Andrew Or] Add mechanism in yarn client mode


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/1df05a40
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/1df05a40
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/1df05a40

Branch: refs/heads/master
Commit: 1df05a40ebf3493b0aff46d18c0f30d2d5256c7b
Parents: 3535467
Author: Andrew Or <andrew@databricks.com>
Authored: Wed Oct 29 14:01:00 2014 -0700
Committer: Andrew Or <andrew@databricks.com>
Committed: Wed Oct 29 14:01:00 2014 -0700

----------------------------------------------------------------------
 .../scala/org/apache/spark/SparkContext.scala   |  64 +++++++--
 .../spark/scheduler/TaskSchedulerImpl.scala     |   1 -
 .../cluster/CoarseGrainedClusterMessage.scala   |  14 +-
 .../cluster/CoarseGrainedSchedulerBackend.scala | 105 ++++++++++----
 .../cluster/YarnSchedulerBackend.scala          | 142 +++++++++++++++++++
 .../scala/org/apache/spark/util/AkkaUtils.scala |  17 ++-
 .../SparkContextSchedulerCreationSuite.scala    |   7 +-
 project/MimaExcludes.scala                      |   4 +
 .../spark/deploy/yarn/ApplicationMaster.scala   |  34 +++--
 .../spark/deploy/yarn/YarnAllocator.scala       |  51 ++++++-
 .../cluster/YarnClientSchedulerBackend.scala    |  19 +--
 .../cluster/YarnClusterSchedulerBackend.scala   |  12 +-
 12 files changed, 391 insertions(+), 79 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/1df05a40/core/src/main/scala/org/apache/spark/SparkContext.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index e8fdfff..40ea369 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -294,7 +294,8 @@ class SparkContext(config: SparkConf) extends SparkStatusAPI with Logging
{
   executorEnvs("SPARK_USER") = sparkUser
 
   // Create and start the scheduler
-  private[spark] var taskScheduler = SparkContext.createTaskScheduler(this, master)
+  private[spark] var (schedulerBackend, taskScheduler) =
+    SparkContext.createTaskScheduler(this, master)
   private val heartbeatReceiver = env.actorSystem.actorOf(
     Props(new HeartbeatReceiver(taskScheduler)), "HeartbeatReceiver")
   @volatile private[spark] var dagScheduler: DAGScheduler = _
@@ -856,6 +857,40 @@ class SparkContext(config: SparkConf) extends SparkStatusAPI with Logging
{
     listenerBus.addListener(listener)
   }
 
+  /**
+   * :: DeveloperApi ::
+   * Request an additional number of executors from the cluster manager.
+   * This is currently only supported in Yarn mode.
+   */
+  @DeveloperApi
+  def requestExecutors(numAdditionalExecutors: Int): Unit = {
+    schedulerBackend match {
+      case b: CoarseGrainedSchedulerBackend => b.requestExecutors(numAdditionalExecutors)
+      case _ => logWarning("Requesting executors is only supported in coarse-grained mode")
+    }
+  }
+
+  /**
+   * :: DeveloperApi ::
+   * Request that the cluster manager kill the specified executors.
+   * This is currently only supported in Yarn mode.
+   */
+  @DeveloperApi
+  def killExecutors(executorIds: Seq[String]): Unit = {
+    schedulerBackend match {
+      case b: CoarseGrainedSchedulerBackend => b.killExecutors(executorIds)
+      case _ => logWarning("Killing executors is only supported in coarse-grained mode")
+    }
+  }
+
+  /**
+   * :: DeveloperApi ::
+   * Request that cluster manager the kill the specified executor.
+   * This is currently only supported in Yarn mode.
+   */
+  @DeveloperApi
+  def killExecutor(executorId: String): Unit = killExecutors(Seq(executorId))
+
   /** The version of Spark on which this application is running. */
   def version = SPARK_VERSION
 
@@ -1438,8 +1473,13 @@ object SparkContext extends Logging {
     res
   }
 
-  /** Creates a task scheduler based on a given master URL. Extracted for testing. */
-  private def createTaskScheduler(sc: SparkContext, master: String): TaskScheduler = {
+  /**
+   * Create a task scheduler based on a given master URL.
+   * Return a 2-tuple of the scheduler backend and the task scheduler.
+   */
+  private def createTaskScheduler(
+      sc: SparkContext,
+      master: String): (SchedulerBackend, TaskScheduler) = {
     // Regular expression used for local[N] and local[*] master formats
     val LOCAL_N_REGEX = """local\[([0-9]+|\*)\]""".r
     // Regular expression for local[N, maxRetries], used in tests with failing tasks
@@ -1461,7 +1501,7 @@ object SparkContext extends Logging {
         val scheduler = new TaskSchedulerImpl(sc, MAX_LOCAL_TASK_FAILURES, isLocal = true)
         val backend = new LocalBackend(scheduler, 1)
         scheduler.initialize(backend)
-        scheduler
+        (backend, scheduler)
 
       case LOCAL_N_REGEX(threads) =>
         def localCpuCount = Runtime.getRuntime.availableProcessors()
@@ -1470,7 +1510,7 @@ object SparkContext extends Logging {
         val scheduler = new TaskSchedulerImpl(sc, MAX_LOCAL_TASK_FAILURES, isLocal = true)
         val backend = new LocalBackend(scheduler, threadCount)
         scheduler.initialize(backend)
-        scheduler
+        (backend, scheduler)
 
       case LOCAL_N_FAILURES_REGEX(threads, maxFailures) =>
         def localCpuCount = Runtime.getRuntime.availableProcessors()
@@ -1480,14 +1520,14 @@ object SparkContext extends Logging {
         val scheduler = new TaskSchedulerImpl(sc, maxFailures.toInt, isLocal = true)
         val backend = new LocalBackend(scheduler, threadCount)
         scheduler.initialize(backend)
-        scheduler
+        (backend, scheduler)
 
       case SPARK_REGEX(sparkUrl) =>
         val scheduler = new TaskSchedulerImpl(sc)
         val masterUrls = sparkUrl.split(",").map("spark://" + _)
         val backend = new SparkDeploySchedulerBackend(scheduler, sc, masterUrls)
         scheduler.initialize(backend)
-        scheduler
+        (backend, scheduler)
 
       case LOCAL_CLUSTER_REGEX(numSlaves, coresPerSlave, memoryPerSlave) =>
         // Check to make sure memory requested <= memoryPerSlave. Otherwise Spark will
just hang.
@@ -1507,7 +1547,7 @@ object SparkContext extends Logging {
         backend.shutdownCallback = (backend: SparkDeploySchedulerBackend) => {
           localCluster.stop()
         }
-        scheduler
+        (backend, scheduler)
 
       case "yarn-standalone" | "yarn-cluster" =>
         if (master == "yarn-standalone") {
@@ -1536,7 +1576,7 @@ object SparkContext extends Logging {
           }
         }
         scheduler.initialize(backend)
-        scheduler
+        (backend, scheduler)
 
       case "yarn-client" =>
         val scheduler = try {
@@ -1563,7 +1603,7 @@ object SparkContext extends Logging {
         }
 
         scheduler.initialize(backend)
-        scheduler
+        (backend, scheduler)
 
       case mesosUrl @ MESOS_REGEX(_) =>
         MesosNativeLibrary.load()
@@ -1576,13 +1616,13 @@ object SparkContext extends Logging {
           new MesosSchedulerBackend(scheduler, sc, url)
         }
         scheduler.initialize(backend)
-        scheduler
+        (backend, scheduler)
 
       case SIMR_REGEX(simrUrl) =>
         val scheduler = new TaskSchedulerImpl(sc)
         val backend = new SimrSchedulerBackend(scheduler, sc, simrUrl)
         scheduler.initialize(backend)
-        scheduler
+        (backend, scheduler)
 
       case _ =>
         throw new SparkException("Could not parse Master URL: '" + master + "'")

http://git-wip-us.apache.org/repos/asf/spark/blob/1df05a40/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
index 2b39c7f..cd3c015 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
@@ -34,7 +34,6 @@ import org.apache.spark.scheduler.SchedulingMode.SchedulingMode
 import org.apache.spark.util.Utils
 import org.apache.spark.executor.TaskMetrics
 import org.apache.spark.storage.BlockManagerId
-import akka.actor.Props
 
 /**
  * Schedules tasks for multiple types of clusters by acting through a SchedulerBackend.

http://git-wip-us.apache.org/repos/asf/spark/blob/1df05a40/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala
b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala
index fb8160a..1da6fe9 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala
@@ -66,7 +66,19 @@ private[spark] object CoarseGrainedClusterMessages {
 
   case class RemoveExecutor(executorId: String, reason: String) extends CoarseGrainedClusterMessage
 
-  case class AddWebUIFilter(filterName:String, filterParams: Map[String, String], proxyBase
:String)
+  // Exchanged between the driver and the AM in Yarn client mode
+  case class AddWebUIFilter(filterName:String, filterParams: Map[String, String], proxyBase:
String)
     extends CoarseGrainedClusterMessage
 
+  // Messages exchanged between the driver and the cluster manager for executor allocation
+  // In Yarn mode, these are exchanged between the driver and the AM
+
+  case object RegisterClusterManager extends CoarseGrainedClusterMessage
+
+  // Request executors by specifying the new total number of executors desired
+  // This includes executors already pending or running
+  case class RequestExecutors(requestedTotal: Int) extends CoarseGrainedClusterMessage
+
+  case class KillExecutors(executorIds: Seq[String]) extends CoarseGrainedClusterMessage
+
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/1df05a40/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
index 59aed6b..7a6ee56 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
@@ -31,7 +31,6 @@ import org.apache.spark.{SparkEnv, Logging, SparkException, TaskState}
 import org.apache.spark.scheduler.{SchedulerBackend, SlaveLost, TaskDescription, TaskSchedulerImpl,
WorkerOffer}
 import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages._
 import org.apache.spark.util.{ActorLogReceive, SerializableBuffer, AkkaUtils, Utils}
-import org.apache.spark.ui.JettyUtils
 
 /**
  * A scheduler backend that waits for coarse grained executors to connect to it through Akka.
@@ -42,7 +41,7 @@ import org.apache.spark.ui.JettyUtils
  * (spark.deploy.*).
  */
 private[spark]
-class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: ActorSystem)
+class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val actorSystem: ActorSystem)
   extends SchedulerBackend with Logging
 {
   // Use an atomic variable to track total number of cores in the cluster for simplicity
and speed
@@ -61,10 +60,17 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem:
A
     conf.getInt("spark.scheduler.maxRegisteredResourcesWaitingTime", 30000)
   val createTime = System.currentTimeMillis()
 
+  private val executorDataMap = new HashMap[String, ExecutorData]
+
+  // Number of executors requested from the cluster manager that have not registered yet
+  private var numPendingExecutors = 0
+
+  // Executors we have requested the cluster manager to kill that have not died yet
+  private val executorsPendingToRemove = new HashSet[String]
+
   class DriverActor(sparkProperties: Seq[(String, String)]) extends Actor with ActorLogReceive
{
     override protected def log = CoarseGrainedSchedulerBackend.this.log
     private val addressToExecutorId = new HashMap[Address, String]
-    private val executorDataMap = new HashMap[String, ExecutorData]
 
     override def preStart() {
       // Listen for remote client disconnection events, since they don't go through Akka's
watch()
@@ -84,12 +90,21 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem:
A
         } else {
           logInfo("Registered executor: " + sender + " with ID " + executorId)
           sender ! RegisteredExecutor
-          executorDataMap.put(executorId, new ExecutorData(sender, sender.path.address,
-            Utils.parseHostPort(hostPort)._1, cores, cores))
 
           addressToExecutorId(sender.path.address) = executorId
           totalCoreCount.addAndGet(cores)
           totalRegisteredExecutors.addAndGet(1)
+          val (host, _) = Utils.parseHostPort(hostPort)
+          val data = new ExecutorData(sender, sender.path.address, host, cores, cores)
+          // This must be synchronized because variables mutated
+          // in this block are read when requesting executors
+          CoarseGrainedSchedulerBackend.this.synchronized {
+            executorDataMap.put(executorId, data)
+            if (numPendingExecutors > 0) {
+              numPendingExecutors -= 1
+              logDebug(s"Decremented number of pending executors ($numPendingExecutors left)")
+            }
+          }
           makeOffers()
         }
 
@@ -128,10 +143,6 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem:
A
         removeExecutor(executorId, reason)
         sender ! true
 
-      case AddWebUIFilter(filterName, filterParams, proxyBase) =>
-        addWebUIFilter(filterName, filterParams, proxyBase)
-        sender ! true
-
       case DisassociatedEvent(_, address, _) =>
         addressToExecutorId.get(address).foreach(removeExecutor(_,
           "remote Akka client disassociated"))
@@ -183,13 +194,18 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem:
A
     }
 
     // Remove a disconnected slave from the cluster
-    def removeExecutor(executorId: String, reason: String) {
+    def removeExecutor(executorId: String, reason: String): Unit = {
       executorDataMap.get(executorId) match {
         case Some(executorInfo) =>
-          executorDataMap -= executorId
+          // This must be synchronized because variables mutated
+          // in this block are read when requesting executors
+          CoarseGrainedSchedulerBackend.this.synchronized {
+            executorDataMap -= executorId
+            executorsPendingToRemove -= executorId
+          }
           totalCoreCount.addAndGet(-executorInfo.totalCores)
           scheduler.executorLost(executorId, SlaveLost(reason))
-        case None => logError(s"Asked to remove non existant executor $executorId")
+        case None => logError(s"Asked to remove non-existent executor $executorId")
       }
     }
   }
@@ -274,21 +290,62 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem:
A
     false
   }
 
-  // Add filters to the SparkUI
-  def addWebUIFilter(filterName: String, filterParams: Map[String, String], proxyBase: String)
{
-    if (proxyBase != null && proxyBase.nonEmpty) {
-      System.setProperty("spark.ui.proxyBase", proxyBase)
-    }
+  /**
+   * Return the number of executors currently registered with this backend.
+   */
+  def numExistingExecutors: Int = executorDataMap.size
+
+  /**
+   * Request an additional number of executors from the cluster manager.
+   * Return whether the request is acknowledged.
+   */
+  final def requestExecutors(numAdditionalExecutors: Int): Boolean = synchronized {
+    logInfo(s"Requesting $numAdditionalExecutors additional executor(s) from the cluster
manager")
+    logDebug(s"Number of pending executors is now $numPendingExecutors")
+    numPendingExecutors += numAdditionalExecutors
+    // Account for executors pending to be added or removed
+    val newTotal = numExistingExecutors + numPendingExecutors - executorsPendingToRemove.size
+    doRequestTotalExecutors(newTotal)
+  }
 
-    val hasFilter = (filterName != null && filterName.nonEmpty &&
-      filterParams != null && filterParams.nonEmpty)
-    if (hasFilter) {
-      logInfo(s"Add WebUI Filter. $filterName, $filterParams, $proxyBase")
-      conf.set("spark.ui.filters", filterName)
-      filterParams.foreach { case (k, v) => conf.set(s"spark.$filterName.param.$k", v)
}
-      scheduler.sc.ui.foreach { ui => JettyUtils.addFilters(ui.getHandlers, conf) }
+  /**
+   * Request executors from the cluster manager by specifying the total number desired,
+   * including existing pending and running executors.
+   *
+   * The semantics here guarantee that we do not over-allocate executors for this application,
+   * since a later request overrides the value of any prior request. The alternative interface
+   * of requesting a delta of executors risks double counting new executors when there are
+   * insufficient resources to satisfy the first request. We make the assumption here that
the
+   * cluster manager will eventually fulfill all requests when resources free up.
+   *
+   * Return whether the request is acknowledged.
+   */
+  protected def doRequestTotalExecutors(requestedTotal: Int): Boolean = false
+
+  /**
+   * Request that the cluster manager kill the specified executors.
+   * Return whether the kill request is acknowledged.
+   */
+  final def killExecutors(executorIds: Seq[String]): Boolean = {
+    logInfo(s"Requesting to kill executor(s) ${executorIds.mkString(", ")}")
+    val filteredExecutorIds = new ArrayBuffer[String]
+    executorIds.foreach { id =>
+      if (executorDataMap.contains(id)) {
+        filteredExecutorIds += id
+      } else {
+        logWarning(s"Executor to kill $id does not exist!")
+      }
     }
+    executorsPendingToRemove ++= filteredExecutorIds
+    doKillExecutors(filteredExecutorIds)
   }
+
+  /**
+   * Kill the given list of executors through the cluster manager.
+   * Return whether the kill request is acknowledged.
+   */
+  protected def doKillExecutors(executorIds: Seq[String]): Boolean = false
+
 }
 
 private[spark] object CoarseGrainedSchedulerBackend {

http://git-wip-us.apache.org/repos/asf/spark/blob/1df05a40/core/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala
b/core/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala
new file mode 100644
index 0000000..50721b9
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler.cluster
+
+import akka.actor.{Actor, ActorRef, Props}
+import akka.remote.{DisassociatedEvent, RemotingLifecycleEvent}
+
+import org.apache.spark.SparkContext
+import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages._
+import org.apache.spark.scheduler.TaskSchedulerImpl
+import org.apache.spark.ui.JettyUtils
+import org.apache.spark.util.AkkaUtils
+
+/**
+ * Abstract Yarn scheduler backend that contains common logic
+ * between the client and cluster Yarn scheduler backends.
+ */
+private[spark] abstract class YarnSchedulerBackend(
+    scheduler: TaskSchedulerImpl,
+    sc: SparkContext)
+  extends CoarseGrainedSchedulerBackend(scheduler, sc.env.actorSystem) {
+
+  if (conf.getOption("spark.scheduler.minRegisteredResourcesRatio").isEmpty) {
+    minRegisteredRatio = 0.8
+  }
+
+  protected var totalExpectedExecutors = 0
+
+  private val yarnSchedulerActor: ActorRef =
+    actorSystem.actorOf(
+      Props(new YarnSchedulerActor),
+      name = YarnSchedulerBackend.ACTOR_NAME)
+
+  private implicit val askTimeout = AkkaUtils.askTimeout(sc.conf)
+
+  /**
+   * Request executors from the ApplicationMaster by specifying the total number desired.
+   * This includes executors already pending or running.
+   */
+  override def doRequestTotalExecutors(requestedTotal: Int): Boolean = {
+    AkkaUtils.askWithReply[Boolean](
+      RequestExecutors(requestedTotal), yarnSchedulerActor, askTimeout)
+  }
+
+  /**
+   * Request that the ApplicationMaster kill the specified executors.
+   */
+  override def doKillExecutors(executorIds: Seq[String]): Boolean = {
+    AkkaUtils.askWithReply[Boolean](
+      KillExecutors(executorIds), yarnSchedulerActor, askTimeout)
+  }
+
+  override def sufficientResourcesRegistered(): Boolean = {
+    totalRegisteredExecutors.get() >= totalExpectedExecutors * minRegisteredRatio
+  }
+
+  /**
+   * Add filters to the SparkUI.
+   */
+  private def addWebUIFilter(
+      filterName: String,
+      filterParams: Map[String, String],
+      proxyBase: String): Unit = {
+    if (proxyBase != null && proxyBase.nonEmpty) {
+      System.setProperty("spark.ui.proxyBase", proxyBase)
+    }
+
+    val hasFilter =
+      filterName != null && filterName.nonEmpty &&
+      filterParams != null && filterParams.nonEmpty
+    if (hasFilter) {
+      logInfo(s"Add WebUI Filter. $filterName, $filterParams, $proxyBase")
+      conf.set("spark.ui.filters", filterName)
+      filterParams.foreach { case (k, v) => conf.set(s"spark.$filterName.param.$k", v)
}
+      scheduler.sc.ui.foreach { ui => JettyUtils.addFilters(ui.getHandlers, conf) }
+    }
+  }
+
+  /**
+   * An actor that communicates with the ApplicationMaster.
+   */
+  private class YarnSchedulerActor extends Actor {
+    private var amActor: Option[ActorRef] = None
+
+    override def preStart(): Unit = {
+      // Listen for disassociation events
+      context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent])
+    }
+
+    override def receive = {
+      case RegisterClusterManager =>
+        logInfo(s"ApplicationMaster registered as $sender")
+        amActor = Some(sender)
+
+      case r: RequestExecutors =>
+        amActor match {
+          case Some(actor) =>
+            sender ! AkkaUtils.askWithReply[Boolean](r, actor, askTimeout)
+          case None =>
+            logWarning("Attempted to request executors before the AM has registered!")
+            sender ! false
+        }
+
+      case k: KillExecutors =>
+        amActor match {
+          case Some(actor) =>
+            sender ! AkkaUtils.askWithReply[Boolean](k, actor, askTimeout)
+          case None =>
+            logWarning("Attempted to kill executors before the AM has registered!")
+            sender ! false
+        }
+
+      case AddWebUIFilter(filterName, filterParams, proxyBase) =>
+        addWebUIFilter(filterName, filterParams, proxyBase)
+        sender ! true
+
+      case d: DisassociatedEvent =>
+        if (amActor.isDefined && sender == amActor.get) {
+          logWarning(s"ApplicationMaster has disassociated: $d")
+        }
+    }
+  }
+}
+
+private[spark] object YarnSchedulerBackend {
+  val ACTOR_NAME = "YarnScheduler"
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/1df05a40/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala b/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala
index f41c8d0..79e398e 100644
--- a/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala
+++ b/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala
@@ -159,17 +159,28 @@ private[spark] object AkkaUtils extends Logging {
   def askWithReply[T](
       message: Any,
       actor: ActorRef,
-      retryAttempts: Int,
+      timeout: FiniteDuration): T = {
+    askWithReply[T](message, actor, maxAttempts = 1, retryInterval = Int.MaxValue, timeout)
+  }
+
+  /**
+   * Send a message to the given actor and get its result within a default timeout, or
+   * throw a SparkException if this fails even after the specified number of retries.
+   */
+  def askWithReply[T](
+      message: Any,
+      actor: ActorRef,
+      maxAttempts: Int,
       retryInterval: Int,
       timeout: FiniteDuration): T = {
     // TODO: Consider removing multiple attempts
     if (actor == null) {
-      throw new SparkException("Error sending message as driverActor is null " +
+      throw new SparkException("Error sending message as actor is null " +
         "[message = " + message + "]")
     }
     var attempts = 0
     var lastException: Exception = null
-    while (attempts < retryAttempts) {
+    while (attempts < maxAttempts) {
       attempts += 1
       try {
         val future = actor.ask(message)(timeout)

http://git-wip-us.apache.org/repos/asf/spark/blob/1df05a40/core/src/test/scala/org/apache/spark/SparkContextSchedulerCreationSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/SparkContextSchedulerCreationSuite.scala
b/core/src/test/scala/org/apache/spark/SparkContextSchedulerCreationSuite.scala
index 495a0d4..df237ba 100644
--- a/core/src/test/scala/org/apache/spark/SparkContextSchedulerCreationSuite.scala
+++ b/core/src/test/scala/org/apache/spark/SparkContextSchedulerCreationSuite.scala
@@ -19,7 +19,7 @@ package org.apache.spark
 
 import org.scalatest.{BeforeAndAfterEach, FunSuite, PrivateMethodTester}
 
-import org.apache.spark.scheduler.{TaskScheduler, TaskSchedulerImpl}
+import org.apache.spark.scheduler.{SchedulerBackend, TaskScheduler, TaskSchedulerImpl}
 import org.apache.spark.scheduler.cluster.{SimrSchedulerBackend, SparkDeploySchedulerBackend}
 import org.apache.spark.scheduler.cluster.mesos.{CoarseMesosSchedulerBackend, MesosSchedulerBackend}
 import org.apache.spark.scheduler.local.LocalBackend
@@ -31,8 +31,9 @@ class SparkContextSchedulerCreationSuite
     // Create local SparkContext to setup a SparkEnv. We don't actually want to start() the
     // real schedulers, so we don't want to create a full SparkContext with the desired scheduler.
     val sc = new SparkContext("local", "test")
-    val createTaskSchedulerMethod = PrivateMethod[TaskScheduler]('createTaskScheduler)
-    val sched = SparkContext invokePrivate createTaskSchedulerMethod(sc, master)
+    val createTaskSchedulerMethod =
+      PrivateMethod[Tuple2[SchedulerBackend, TaskScheduler]]('createTaskScheduler)
+    val (_, sched) = SparkContext invokePrivate createTaskSchedulerMethod(sc, master)
     sched.asInstanceOf[TaskSchedulerImpl]
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/1df05a40/project/MimaExcludes.scala
----------------------------------------------------------------------
diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala
index adbdc5d..6a0495f 100644
--- a/project/MimaExcludes.scala
+++ b/project/MimaExcludes.scala
@@ -73,6 +73,10 @@ object MimaExcludes {
               "org.apache.spark.api.java.JavaRDDLike.foreachAsync"),
             ProblemFilters.exclude[MissingMethodProblem](
               "org.apache.spark.api.java.JavaRDDLike.collectAsync")
+          ) ++ Seq(
+            // SPARK-3822
+            ProblemFilters.exclude[IncompatibleResultTypeProblem](
+              "org.apache.spark.SparkContext.org$apache$spark$SparkContext$$createTaskScheduler")
           )
 
         case v if v.startsWith("1.1") =>

http://git-wip-us.apache.org/repos/asf/spark/blob/1df05a40/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
----------------------------------------------------------------------
diff --git a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
index e6fe026..6807379 100644
--- a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
+++ b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
@@ -36,8 +36,8 @@ import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkContext,
Spar
 import org.apache.spark.SparkException
 import org.apache.spark.deploy.SparkHadoopUtil
 import org.apache.spark.deploy.history.HistoryServer
-import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
-import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.AddWebUIFilter
+import org.apache.spark.scheduler.cluster.YarnSchedulerBackend
+import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages._
 import org.apache.spark.util.{AkkaUtils, SignalLogger, Utils}
 
 /**
@@ -385,8 +385,8 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments,
       SparkEnv.driverActorSystemName,
       driverHost,
       driverPort.toString,
-      CoarseGrainedSchedulerBackend.ACTOR_NAME)
-    actorSystem.actorOf(Props(new MonitorActor(driverUrl)), name = "YarnAM")
+      YarnSchedulerBackend.ACTOR_NAME)
+    actorSystem.actorOf(Props(new AMActor(driverUrl)), name = "YarnAM")
   }
 
   /** Add the Yarn IP filter that is required for properly securing the UI. */
@@ -479,9 +479,10 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments,
     userThread
   }
 
-  // Actor used to monitor the driver when running in client deploy mode.
-  private class MonitorActor(driverUrl: String) extends Actor {
-
+  /**
+   * Actor that communicates with the driver in client deploy mode.
+   */
+  private class AMActor(driverUrl: String) extends Actor {
     var driver: ActorSelection = _
 
     override def preStart() = {
@@ -490,6 +491,7 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments,
       // Send a hello message to establish the connection, after which
       // we can monitor Lifecycle Events.
       driver ! "Hello"
+      driver ! RegisterClusterManager
       context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent])
     }
 
@@ -497,11 +499,27 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments,
       case x: DisassociatedEvent =>
         logInfo(s"Driver terminated or disconnected! Shutting down. $x")
         finish(FinalApplicationStatus.SUCCEEDED, ApplicationMaster.EXIT_SUCCESS)
+
       case x: AddWebUIFilter =>
         logInfo(s"Add WebUI Filter. $x")
         driver ! x
-    }
 
+      case RequestExecutors(requestedTotal) =>
+        logInfo(s"Driver requested a total number of executors of $requestedTotal.")
+        Option(allocator) match {
+          case Some(a) => a.requestTotalExecutors(requestedTotal)
+          case None => logWarning("Container allocator is not ready to request executors
yet.")
+        }
+        sender ! true
+
+      case KillExecutors(executorIds) =>
+        logInfo(s"Driver requested to kill executor(s) ${executorIds.mkString(", ")}.")
+        Option(allocator) match {
+          case Some(a) => executorIds.foreach(a.killExecutor)
+          case None => logWarning("Container allocator is not ready to kill executors
yet.")
+        }
+        sender ! true
+    }
   }
 
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/1df05a40/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
----------------------------------------------------------------------
diff --git a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
index e1af8d5..7ae8ef2 100644
--- a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
+++ b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
@@ -88,7 +88,10 @@ private[yarn] abstract class YarnAllocator(
   private val executorIdCounter = new AtomicInteger()
   private val numExecutorsFailed = new AtomicInteger()
 
-  private val maxExecutors = args.numExecutors
+  private var maxExecutors = args.numExecutors
+
+  // Keep track of which container is running which executor to remove the executors later
+  private val executorIdToContainer = new HashMap[String, Container]
 
   protected val executorMemory = args.executorMemory
   protected val executorCores = args.executorCores
@@ -111,7 +114,48 @@ private[yarn] abstract class YarnAllocator(
 
   def getNumExecutorsFailed: Int = numExecutorsFailed.intValue
 
-  def allocateResources() = {
+  /**
+   * Request as many executors from the ResourceManager as needed to reach the desired total.
+   * This takes into account executors already running or pending.
+   */
+  def requestTotalExecutors(requestedTotal: Int): Unit = synchronized {
+    val currentTotal = numPendingAllocate.get + numExecutorsRunning.get
+    if (requestedTotal > currentTotal) {
+      maxExecutors += (requestedTotal - currentTotal)
+      // We need to call `allocateResources` here to avoid the following race condition:
+      // If we request executors twice before `allocateResources` is called, then we will
end up
+      // double counting the number requested because `numPendingAllocate` is not updated
yet.
+      allocateResources()
+    } else {
+      logInfo(s"Not allocating more executors because there are already $currentTotal " +
+        s"(application requested $requestedTotal total)")
+    }
+  }
+
+  /**
+   * Request that the ResourceManager release the container running the specified executor.
+   */
+  def killExecutor(executorId: String): Unit = synchronized {
+    if (executorIdToContainer.contains(executorId)) {
+      val container = executorIdToContainer.remove(executorId).get
+      internalReleaseContainer(container)
+      numExecutorsRunning.decrementAndGet()
+      maxExecutors -= 1
+      assert(maxExecutors >= 0, "Allocator killed more executors than are allocated!")
+    } else {
+      logWarning(s"Attempted to kill unknown executor $executorId!")
+    }
+  }
+
+  /**
+   * Allocate missing containers based on the number of executors currently pending and running.
+   *
+   * This method prioritizes the allocated container responses from the RM based on node
and
+   * rack locality. Additionally, it releases any extra containers allocated for this application
+   * but are not needed. This must be synchronized because variables read in this block are
+   * mutated by other methods.
+   */
+  def allocateResources(): Unit = synchronized {
     val missing = maxExecutors - numPendingAllocate.get() - numExecutorsRunning.get()
 
     // this is needed by alpha, do it here since we add numPending right after this
@@ -119,7 +163,7 @@ private[yarn] abstract class YarnAllocator(
     if (missing > 0) {
       val totalExecutorMemory = executorMemory + memoryOverhead
       numPendingAllocate.addAndGet(missing)
-      logInfo(s"Will allocate $missing executor containers, each with $totalExecutorMemory
MB " + 
+      logInfo(s"Will allocate $missing executor containers, each with $totalExecutorMemory
MB " +
         s"memory including $memoryOverhead MB overhead")
     } else {
       logDebug("Empty allocation request ...")
@@ -269,6 +313,7 @@ private[yarn] abstract class YarnAllocator(
             CoarseGrainedSchedulerBackend.ACTOR_NAME)
 
           logInfo("Launching container %s for on host %s".format(containerId, executorHostname))
+          executorIdToContainer(executorId) = container
 
           // To be safe, remove the container from `releasedContainers`.
           releasedContainers.remove(containerId)

http://git-wip-us.apache.org/repos/asf/spark/blob/1df05a40/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
----------------------------------------------------------------------
diff --git a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
index 59b2b47..f6f6dc5 100644
--- a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
+++ b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
@@ -17,27 +17,23 @@
 
 package org.apache.spark.scheduler.cluster
 
+import scala.collection.mutable.ArrayBuffer
+
 import org.apache.hadoop.yarn.api.records.{ApplicationId, YarnApplicationState}
+
 import org.apache.spark.{SparkException, Logging, SparkContext}
 import org.apache.spark.deploy.yarn.{Client, ClientArguments}
 import org.apache.spark.scheduler.TaskSchedulerImpl
 
-import scala.collection.mutable.ArrayBuffer
-
 private[spark] class YarnClientSchedulerBackend(
     scheduler: TaskSchedulerImpl,
     sc: SparkContext)
-  extends CoarseGrainedSchedulerBackend(scheduler, sc.env.actorSystem)
+  extends YarnSchedulerBackend(scheduler, sc)
   with Logging {
 
-  if (conf.getOption("spark.scheduler.minRegisteredResourcesRatio").isEmpty) {
-    minRegisteredRatio = 0.8
-  }
-
   private var client: Client = null
   private var appId: ApplicationId = null
   private var stopping: Boolean = false
-  private var totalExpectedExecutors = 0
 
   /**
    * Create a Yarn client to submit an application to the ResourceManager.
@@ -151,14 +147,11 @@ private[spark] class YarnClientSchedulerBackend(
     logInfo("Stopped")
   }
 
-  override def sufficientResourcesRegistered(): Boolean = {
-    totalRegisteredExecutors.get() >= totalExpectedExecutors * minRegisteredRatio
-  }
-
-  override def applicationId(): String =
+  override def applicationId(): String = {
     Option(appId).map(_.toString).getOrElse {
       logWarning("Application ID is not initialized yet.")
       super.applicationId
     }
+  }
 
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/1df05a40/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala
----------------------------------------------------------------------
diff --git a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala
b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala
index 3a186cf..a96a54f 100644
--- a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala
+++ b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala
@@ -25,13 +25,7 @@ import org.apache.spark.util.IntParam
 private[spark] class YarnClusterSchedulerBackend(
     scheduler: TaskSchedulerImpl,
     sc: SparkContext)
-  extends CoarseGrainedSchedulerBackend(scheduler, sc.env.actorSystem) {
-
-  var totalExpectedExecutors = 0
-
-  if (conf.getOption("spark.scheduler.minRegisteredResourcesRatio").isEmpty) {
-    minRegisteredRatio = 0.8
-  }
+  extends YarnSchedulerBackend(scheduler, sc) {
 
   override def start() {
     super.start()
@@ -44,10 +38,6 @@ private[spark] class YarnClusterSchedulerBackend(
     totalExpectedExecutors = sc.getConf.getInt("spark.executor.instances", totalExpectedExecutors)
   }
 
-  override def sufficientResourcesRegistered(): Boolean = {
-    totalRegisteredExecutors.get() >= totalExpectedExecutors * minRegisteredRatio
-  }
-
   override def applicationId(): String =
     // In YARN Cluster mode, spark.yarn.app.id is expect to be set
     // before user application is launched.


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message