spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From andrewo...@apache.org
Subject spark git commit: [SPARK-8059] [YARN] Wake up allocation thread when new requests arrive.
Date Wed, 03 Jun 2015 21:59:34 GMT
Repository: spark
Updated Branches:
  refs/heads/master bfbf12b34 -> aa40c4420


[SPARK-8059] [YARN] Wake up allocation thread when new requests arrive.

This should help reduce latency for new executor allocations.

Author: Marcelo Vanzin <vanzin@cloudera.com>

Closes #6600 from vanzin/SPARK-8059 and squashes the following commits:

8387a3a [Marcelo Vanzin] [SPARK-8059] [yarn] Wake up allocation thread when new requests arrive.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/aa40c442
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/aa40c442
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/aa40c442

Branch: refs/heads/master
Commit: aa40c4420717aa06a7964bd30b428fb73548beb2
Parents: bfbf12b
Author: Marcelo Vanzin <vanzin@cloudera.com>
Authored: Wed Jun 3 14:59:30 2015 -0700
Committer: Andrew Or <andrew@databricks.com>
Committed: Wed Jun 3 14:59:30 2015 -0700

----------------------------------------------------------------------
 .../spark/deploy/yarn/ApplicationMaster.scala       | 16 +++++++++++++---
 .../apache/spark/deploy/yarn/YarnAllocator.scala    |  7 ++++++-
 2 files changed, 19 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/aa40c442/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
----------------------------------------------------------------------
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
index 760e458..002d7b6 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
@@ -67,6 +67,7 @@ private[spark] class ApplicationMaster(
 
   @volatile private var reporterThread: Thread = _
   @volatile private var allocator: YarnAllocator = _
+  private val allocatorLock = new Object()
 
   // Fields used in client mode.
   private var rpcEnv: RpcEnv = null
@@ -359,7 +360,9 @@ private[spark] class ApplicationMaster(
               }
             logDebug(s"Number of pending allocations is $numPendingAllocate. " +
                      s"Sleeping for $sleepInterval.")
-            Thread.sleep(sleepInterval)
+            allocatorLock.synchronized {
+              allocatorLock.wait(sleepInterval)
+            }
           } catch {
             case e: InterruptedException =>
           }
@@ -546,8 +549,15 @@ private[spark] class ApplicationMaster(
     override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
       case RequestExecutors(requestedTotal) =>
         Option(allocator) match {
-          case Some(a) => a.requestTotalExecutors(requestedTotal)
-          case None => logWarning("Container allocator is not ready to request executors
yet.")
+          case Some(a) =>
+            allocatorLock.synchronized {
+              if (a.requestTotalExecutors(requestedTotal)) {
+                allocatorLock.notifyAll()
+              }
+            }
+
+          case None =>
+            logWarning("Container allocator is not ready to request executors yet.")
         }
         context.reply(true)
 

http://git-wip-us.apache.org/repos/asf/spark/blob/aa40c442/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
----------------------------------------------------------------------
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
index 21193e7..940873f 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
@@ -146,11 +146,16 @@ private[yarn] class YarnAllocator(
    * Request as many executors from the ResourceManager as needed to reach the desired total.
If
    * the requested total is smaller than the current number of running executors, no executors
will
    * be killed.
+   *
+   * @return Whether the new requested total is different than the old value.
    */
-  def requestTotalExecutors(requestedTotal: Int): Unit = synchronized {
+  def requestTotalExecutors(requestedTotal: Int): Boolean = synchronized {
     if (requestedTotal != targetNumExecutors) {
       logInfo(s"Driver requested a total number of $requestedTotal executor(s).")
       targetNumExecutors = requestedTotal
+      true
+    } else {
+      false
     }
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message