spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tgra...@apache.org
Subject git commit: SPARK-1528 - spark on yarn, add support for accessing remote HDFS
Date Tue, 05 Aug 2014 17:49:34 GMT
Repository: spark
Updated Branches:
  refs/heads/branch-1.1 b92a45058 -> 6c0c65fc8


SPARK-1528 - spark on yarn, add support for accessing remote HDFS

Add a config (spark.yarn.access.namenodes) to allow applications running on yarn to access
other secure HDFS cluster.  User just specifies the namenodes of the other clusters and we
get Tokens for those and ship them with the spark application.

Author: Thomas Graves <tgraves@apache.org>

Closes #1159 from tgravescs/spark-1528 and squashes the following commits:

ddbcd16 [Thomas Graves] review comments
0ac8501 [Thomas Graves] SPARK-1528 - add support for accessing remote HDFS

(cherry picked from commit 2c0f705e26ca3dfc43a1e9a0722c0e57f67c970a)
Signed-off-by: Thomas Graves <tgraves@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/6c0c65fc
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/6c0c65fc
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/6c0c65fc

Branch: refs/heads/branch-1.1
Commit: 6c0c65fc85677ab2cae819a546ea50ed660994c3
Parents: b92a450
Author: Thomas Graves <tgraves@apache.org>
Authored: Tue Aug 5 12:48:26 2014 -0500
Committer: Thomas Graves <tgraves@apache.org>
Committed: Tue Aug 5 12:48:51 2014 -0500

----------------------------------------------------------------------
 docs/running-on-yarn.md                         |  7 +++
 .../apache/spark/deploy/yarn/ClientBase.scala   | 56 ++++++++++++++------
 .../spark/deploy/yarn/ClientBaseSuite.scala     | 55 ++++++++++++++++++-
 3 files changed, 101 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/6c0c65fc/docs/running-on-yarn.md
----------------------------------------------------------------------
diff --git a/docs/running-on-yarn.md b/docs/running-on-yarn.md
index 0362f5a..573930d 100644
--- a/docs/running-on-yarn.md
+++ b/docs/running-on-yarn.md
@@ -106,6 +106,13 @@ Most of the configs are the same for Spark on YARN as for other deployment
modes
     set this configuration to "hdfs:///some/path".
   </td>
 </tr>
+<tr>
+  <td><code>spark.yarn.access.namenodes</code></td>
+  <td>(none)</td>
+  <td>
+    A list of secure HDFS namenodes your Spark application is going to access. For example,
`spark.yarn.access.namenodes=hdfs://nn1.com:8032,hdfs://nn2.com:8032`. The Spark application
must have acess to the namenodes listed and Kerberos must be properly configured to be able
to access them (either in the same realm or in a trusted realm). Spark acquires security tokens
for each of the namenodes so that the Spark application can access those remote HDFS clusters.
+  </td>
+</tr>
 </table>
 
 # Launching Spark on YARN

http://git-wip-us.apache.org/repos/asf/spark/blob/6c0c65fc/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala
----------------------------------------------------------------------
diff --git a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala
index b7e8636..ed8f56a 100644
--- a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala
+++ b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala
@@ -29,7 +29,7 @@ import org.apache.hadoop.fs._
 import org.apache.hadoop.fs.permission.FsPermission
 import org.apache.hadoop.mapred.Master
 import org.apache.hadoop.mapreduce.MRJobConfig
-import org.apache.hadoop.security.UserGroupInformation
+import org.apache.hadoop.security.{Credentials, UserGroupInformation}
 import org.apache.hadoop.util.StringUtils
 import org.apache.hadoop.yarn.api._
 import org.apache.hadoop.yarn.api.ApplicationConstants.Environment
@@ -191,23 +191,11 @@ trait ClientBase extends Logging {
     // Upload Spark and the application JAR to the remote file system if necessary. Add them
as
     // local resources to the application master.
     val fs = FileSystem.get(conf)
-
-    val delegTokenRenewer = Master.getMasterPrincipal(conf)
-    if (UserGroupInformation.isSecurityEnabled()) {
-      if (delegTokenRenewer == null || delegTokenRenewer.length() == 0) {
-        val errorMessage = "Can't get Master Kerberos principal for use as renewer"
-        logError(errorMessage)
-        throw new SparkException(errorMessage)
-      }
-    }
     val dst = new Path(fs.getHomeDirectory(), appStagingDir)
-    val replication = sparkConf.getInt("spark.yarn.submit.file.replication", 3).toShort
-
-    if (UserGroupInformation.isSecurityEnabled()) {
-      val dstFs = dst.getFileSystem(conf)
-      dstFs.addDelegationTokens(delegTokenRenewer, credentials)
-    }
+    val nns = ClientBase.getNameNodesToAccess(sparkConf) + dst
+    ClientBase.obtainTokensForNamenodes(nns, conf, credentials)
 
+    val replication = sparkConf.getInt("spark.yarn.submit.file.replication", 3).toShort
     val localResources = HashMap[String, LocalResource]()
     FileSystem.mkdirs(fs, dst, new FsPermission(STAGING_DIR_PERMISSION))
 
@@ -614,4 +602,40 @@ object ClientBase extends Logging {
     YarnSparkHadoopUtil.addToEnvironment(env, Environment.CLASSPATH.name, path,
             File.pathSeparator)
 
+  /** 
+   * Get the list of namenodes the user may access.
+   */
+  private[yarn] def getNameNodesToAccess(sparkConf: SparkConf): Set[Path] = {
+    sparkConf.get("spark.yarn.access.namenodes", "").split(",").map(_.trim()).filter(!_.isEmpty)
+      .map(new Path(_)).toSet
+  }
+
+  private[yarn] def getTokenRenewer(conf: Configuration): String = {
+    val delegTokenRenewer = Master.getMasterPrincipal(conf)
+    logDebug("delegation token renewer is: " + delegTokenRenewer)
+    if (delegTokenRenewer == null || delegTokenRenewer.length() == 0) {
+      val errorMessage = "Can't get Master Kerberos principal for use as renewer"
+      logError(errorMessage)
+      throw new SparkException(errorMessage)
+    }
+    delegTokenRenewer
+  }
+
+  /**
+   * Obtains tokens for the namenodes passed in and adds them to the credentials.
+   */
+  private[yarn] def obtainTokensForNamenodes(paths: Set[Path], conf: Configuration,
+    creds: Credentials) {
+    if (UserGroupInformation.isSecurityEnabled()) {
+      val delegTokenRenewer = getTokenRenewer(conf)
+
+      paths.foreach {
+        dst =>
+          val dstFs = dst.getFileSystem(conf)
+          logDebug("getting token for namenode: " + dst)
+          dstFs.addDelegationTokens(delegTokenRenewer, creds)
+      }
+    }
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/6c0c65fc/yarn/common/src/test/scala/org/apache/spark/deploy/yarn/ClientBaseSuite.scala
----------------------------------------------------------------------
diff --git a/yarn/common/src/test/scala/org/apache/spark/deploy/yarn/ClientBaseSuite.scala
b/yarn/common/src/test/scala/org/apache/spark/deploy/yarn/ClientBaseSuite.scala
index 686714d..68cc289 100644
--- a/yarn/common/src/test/scala/org/apache/spark/deploy/yarn/ClientBaseSuite.scala
+++ b/yarn/common/src/test/scala/org/apache/spark/deploy/yarn/ClientBaseSuite.scala
@@ -31,6 +31,8 @@ import org.apache.hadoop.yarn.api.records.ContainerLaunchContext
 import org.apache.hadoop.yarn.conf.YarnConfiguration
 import org.mockito.Matchers._
 import org.mockito.Mockito._
+
+
 import org.scalatest.FunSuite
 import org.scalatest.Matchers
 
@@ -38,7 +40,7 @@ import scala.collection.JavaConversions._
 import scala.collection.mutable.{ HashMap => MutableHashMap }
 import scala.util.Try
 
-import org.apache.spark.SparkConf
+import org.apache.spark.{SparkException, SparkConf}
 import org.apache.spark.util.Utils
 
 class ClientBaseSuite extends FunSuite with Matchers {
@@ -138,6 +140,57 @@ class ClientBaseSuite extends FunSuite with Matchers {
     }
   }
 
+  test("check access nns empty") {
+    val sparkConf = new SparkConf()
+    sparkConf.set("spark.yarn.access.namenodes", "")
+    val nns = ClientBase.getNameNodesToAccess(sparkConf)
+    nns should be(Set())
+  }
+
+  test("check access nns unset") {
+    val sparkConf = new SparkConf()
+    val nns = ClientBase.getNameNodesToAccess(sparkConf)
+    nns should be(Set())
+  }
+
+  test("check access nns") {
+    val sparkConf = new SparkConf()
+    sparkConf.set("spark.yarn.access.namenodes", "hdfs://nn1:8032")
+    val nns = ClientBase.getNameNodesToAccess(sparkConf)
+    nns should be(Set(new Path("hdfs://nn1:8032")))
+  }
+
+  test("check access nns space") {
+    val sparkConf = new SparkConf()
+    sparkConf.set("spark.yarn.access.namenodes", "hdfs://nn1:8032, ")
+    val nns = ClientBase.getNameNodesToAccess(sparkConf)
+    nns should be(Set(new Path("hdfs://nn1:8032")))
+  }
+
+  test("check access two nns") {
+    val sparkConf = new SparkConf()
+    sparkConf.set("spark.yarn.access.namenodes", "hdfs://nn1:8032,hdfs://nn2:8032")
+    val nns = ClientBase.getNameNodesToAccess(sparkConf)
+    nns should be(Set(new Path("hdfs://nn1:8032"), new Path("hdfs://nn2:8032")))
+  }
+
+  test("check token renewer") {
+    val hadoopConf = new Configuration()
+    hadoopConf.set("yarn.resourcemanager.address", "myrm:8033")
+    hadoopConf.set("yarn.resourcemanager.principal", "yarn/myrm:8032@SPARKTEST.COM")
+    val renewer = ClientBase.getTokenRenewer(hadoopConf)
+    renewer should be ("yarn/myrm:8032@SPARKTEST.COM")
+  }
+
+  test("check token renewer default") {
+    val hadoopConf = new Configuration()
+    val caught =
+      intercept[SparkException] {
+        ClientBase.getTokenRenewer(hadoopConf)
+      }
+    assert(caught.getMessage === "Can't get Master Kerberos principal for use as renewer")
+  }
+
   object Fixtures {
 
     val knownDefYarnAppCP: Seq[String] =


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message