spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sa...@apache.org
Subject spark git commit: [SPARK-6470] [YARN] Add support for YARN node labels.
Date Mon, 11 May 2015 19:09:52 GMT
Repository: spark
Updated Branches:
  refs/heads/master 0a4844f90 -> 82fee9d9a


[SPARK-6470] [YARN] Add support for YARN node labels.

This is difficult to write a test for because it relies on the latest version of YARN, but
I verified manually that the patch does pass along the label expression on this version and
containers are successfully launched.

Author: Sandy Ryza <sandy@cloudera.com>

Closes #5242 from sryza/sandy-spark-6470 and squashes the following commits:

6af87b9 [Sandy Ryza] Change info to warning
6e22d99 [Sandy Ryza] [YARN] SPARK-6470.  Add support for YARN node labels.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/82fee9d9
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/82fee9d9
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/82fee9d9

Branch: refs/heads/master
Commit: 82fee9d9aad2c9ba2fb4bd658579fe99218cafac
Parents: 0a4844f
Author: Sandy Ryza <sandy@cloudera.com>
Authored: Mon May 11 12:09:39 2015 -0700
Committer: Sandy Ryza <sandy@cloudera.com>
Committed: Mon May 11 12:09:39 2015 -0700

----------------------------------------------------------------------
 docs/running-on-yarn.md                         |  9 ++++++
 .../spark/deploy/yarn/YarnAllocator.scala       | 31 +++++++++++++++++++-
 2 files changed, 39 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/82fee9d9/docs/running-on-yarn.md
----------------------------------------------------------------------
diff --git a/docs/running-on-yarn.md b/docs/running-on-yarn.md
index 4fb4a90..51c1339 100644
--- a/docs/running-on-yarn.md
+++ b/docs/running-on-yarn.md
@@ -220,6 +220,15 @@ Most of the configs are the same for Spark on YARN as for other deployment
modes
   Otherwise, the client process will exit after submission.
   </td>
 </tr>
+<tr>
+  <td><code>spark.yarn.executor.nodeLabelExpression</code></td>
+  <td>(none)</td>
+  <td>
+  A YARN node label expression that restricts the set of nodes executors will be scheduled
on.
+  Only versions of YARN greater than or equal to 2.6 support node label expressions, so when
+  running against earlier versions, this property will be ignored.
+  </td>
+</tr>
 </table>
 
 # Launching Spark on YARN

http://git-wip-us.apache.org/repos/asf/spark/blob/82fee9d9/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
----------------------------------------------------------------------
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
index 88d68d5..8a08f56 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
@@ -117,6 +117,24 @@ private[yarn] class YarnAllocator(
   // For testing
   private val launchContainers = sparkConf.getBoolean("spark.yarn.launchContainers", true)
 
+  private val labelExpression = sparkConf.getOption("spark.yarn.executor.nodeLabelExpression")
+
+  // ContainerRequest constructor that can take a node label expression. We grab it through
+  // reflection because it's only available in later versions of YARN.
+  private val nodeLabelConstructor = labelExpression.flatMap { expr =>
+    try {
+      Some(classOf[ContainerRequest].getConstructor(classOf[Resource],
+        classOf[Array[String]], classOf[Array[String]], classOf[Priority], classOf[Boolean],
+        classOf[String]))
+    } catch {
+      case e: NoSuchMethodException => {
+        logWarning(s"Node label expression $expr will be ignored because YARN version on"
+
+          " classpath does not support it.")
+        None
+      }
+    }
+  }
+
   def getNumExecutorsRunning: Int = numExecutorsRunning
 
   def getNumExecutorsFailed: Int = numExecutorsFailed
@@ -211,7 +229,7 @@ private[yarn] class YarnAllocator(
         s"cores and ${resource.getMemory} MB memory including $memoryOverhead MB overhead")
 
       for (i <- 0 until missing) {
-        val request = new ContainerRequest(resource, null, null, RM_REQUEST_PRIORITY)
+        val request = createContainerRequest(resource)
         amClient.addContainerRequest(request)
         val nodes = request.getNodes
         val hostStr = if (nodes == null || nodes.isEmpty) "Any" else nodes.last
@@ -231,6 +249,17 @@ private[yarn] class YarnAllocator(
   }
 
   /**
+   * Creates a container request, handling the reflection required to use YARN features that
were
+   * added in recent versions.
+   */
+  private def createContainerRequest(resource: Resource): ContainerRequest = {
+    nodeLabelConstructor.map { constructor =>
+      constructor.newInstance(resource, null, null, RM_REQUEST_PRIORITY, true: java.lang.Boolean,
+        labelExpression.orNull)
+    }.getOrElse(new ContainerRequest(resource, null, null, RM_REQUEST_PRIORITY))
+  }
+
+  /**
    * Handle containers granted by the RM by launching executors on them.
    *
    * Due to the way the YARN allocation protocol works, certain healthy race conditions can
result


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message