spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From van...@apache.org
Subject spark git commit: [SPARK-24149][YARN] Retrieve all federated namespaces tokens
Date Fri, 18 May 2018 20:04:08 GMT
Repository: spark
Updated Branches:
  refs/heads/master 7f82c4a47 -> 3159ee085


[SPARK-24149][YARN] Retrieve all federated namespaces tokens

## What changes were proposed in this pull request?

Hadoop 3 introduces HDFS federation. This means that multiple namespaces are allowed on the
same HDFS cluster. In Spark, we need to ask the delegation token for all the namenodes (for
each namespace), otherwise accessing any other namespace different from the default one (for
which we already fetch the delegation token) fails.

The PR adds the automatic discovery of all the namenodes related to all the namespaces available
according to the configs in hdfs-site.xml.

## How was this patch tested?

manual tests in dockerized env

Author: Marco Gaido <marcogaido91@gmail.com>

Closes #21216 from mgaido91/SPARK-24149.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/3159ee08
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/3159ee08
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/3159ee08

Branch: refs/heads/master
Commit: 3159ee085b23e2e9f1657d80b7ae3efe82b5edb9
Parents: 7f82c4a
Author: Marco Gaido <marcogaido91@gmail.com>
Authored: Fri May 18 13:04:00 2018 -0700
Committer: Marcelo Vanzin <vanzin@cloudera.com>
Committed: Fri May 18 13:04:00 2018 -0700

----------------------------------------------------------------------
 docs/running-on-yarn.md                         |  9 ++-
 .../spark/deploy/yarn/YarnSparkHadoopUtil.scala | 24 +++++++-
 .../deploy/yarn/YarnSparkHadoopUtilSuite.scala  | 65 +++++++++++++++++++-
 3 files changed, 93 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/3159ee08/docs/running-on-yarn.md
----------------------------------------------------------------------
diff --git a/docs/running-on-yarn.md b/docs/running-on-yarn.md
index c9e68c3..4dbcbea 100644
--- a/docs/running-on-yarn.md
+++ b/docs/running-on-yarn.md
@@ -424,9 +424,12 @@ To use a custom metrics.properties for the application master and executors,
upd
 
 Standard Kerberos support in Spark is covered in the [Security](security.html#kerberos) page.
 
-In YARN mode, when accessing Hadoop file systems, aside from the service hosting the user's
home
-directory, Spark will also automatically obtain delegation tokens for the service hosting
the
-staging directory of the Spark application.
+In YARN mode, when accessing Hadoop filesystems, Spark will automatically obtain delegation
tokens
+for:
+
+- the filesystem hosting the staging directory of the Spark application (which is the default
+  filesystem if `spark.yarn.stagingDir` is not set);
+- if Hadoop federation is enabled, all the federated filesystems in the configuration.
 
 If an application needs to interact with other secure Hadoop filesystems, their URIs need
to be
 explicitly provided to Spark at launch time. This is done by listing them in the

http://git-wip-us.apache.org/repos/asf/spark/blob/3159ee08/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala
----------------------------------------------------------------------
diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala
b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala
index 8eda6cb..7250e58 100644
--- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala
+++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala
@@ -200,7 +200,29 @@ object YarnSparkHadoopUtil {
       .map(new Path(_).getFileSystem(hadoopConf))
       .getOrElse(FileSystem.get(hadoopConf))
 
-    filesystemsToAccess + stagingFS
+    // Add the list of available namenodes for all namespaces in HDFS federation.
+    // If ViewFS is enabled, this is skipped as ViewFS already handles delegation tokens
for its
+    // namespaces.
+    val hadoopFilesystems = if (stagingFS.getScheme == "viewfs") {
+      Set.empty
+    } else {
+      val nameservices = hadoopConf.getTrimmedStrings("dfs.nameservices")
+      // Retrieving the filesystem for the nameservices where HA is not enabled
+      val filesystemsWithoutHA = nameservices.flatMap { ns =>
+        Option(hadoopConf.get(s"dfs.namenode.rpc-address.$ns")).map { nameNode =>
+          new Path(s"hdfs://$nameNode").getFileSystem(hadoopConf)
+        }
+      }
+      // Retrieving the filesystem for the nameservices where HA is enabled
+      val filesystemsWithHA = nameservices.flatMap { ns =>
+        Option(hadoopConf.get(s"dfs.ha.namenodes.$ns")).map { _ =>
+          new Path(s"hdfs://$ns").getFileSystem(hadoopConf)
+        }
+      }
+      (filesystemsWithoutHA ++ filesystemsWithHA).toSet
+    }
+
+    filesystemsToAccess ++ hadoopFilesystems + stagingFS
   }
 
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/3159ee08/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtilSuite.scala
----------------------------------------------------------------------
diff --git a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtilSuite.scala
b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtilSuite.scala
index f21353a..61c0c43 100644
--- a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtilSuite.scala
+++ b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtilSuite.scala
@@ -21,7 +21,8 @@ import java.io.{File, IOException}
 import java.nio.charset.StandardCharsets
 
 import com.google.common.io.{ByteStreams, Files}
-import org.apache.hadoop.io.Text
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.Path
 import org.apache.hadoop.yarn.api.records.ApplicationAccessType
 import org.apache.hadoop.yarn.conf.YarnConfiguration
 import org.scalatest.Matchers
@@ -141,4 +142,66 @@ class YarnSparkHadoopUtilSuite extends SparkFunSuite with Matchers with
Logging
 
   }
 
+  test("SPARK-24149: retrieve all namenodes from HDFS") {
+    val sparkConf = new SparkConf()
+    val basicFederationConf = new Configuration()
+    basicFederationConf.set("fs.defaultFS", "hdfs://localhost:8020")
+    basicFederationConf.set("dfs.nameservices", "ns1,ns2")
+    basicFederationConf.set("dfs.namenode.rpc-address.ns1", "localhost:8020")
+    basicFederationConf.set("dfs.namenode.rpc-address.ns2", "localhost:8021")
+    val basicFederationExpected = Set(
+      new Path("hdfs://localhost:8020").getFileSystem(basicFederationConf),
+      new Path("hdfs://localhost:8021").getFileSystem(basicFederationConf))
+    val basicFederationResult = YarnSparkHadoopUtil.hadoopFSsToAccess(
+      sparkConf, basicFederationConf)
+    basicFederationResult should be (basicFederationExpected)
+
+    // when viewfs is enabled, namespaces are handled by it, so we don't need to take care
of them
+    val viewFsConf = new Configuration()
+    viewFsConf.addResource(basicFederationConf)
+    viewFsConf.set("fs.defaultFS", "viewfs://clusterX/")
+    viewFsConf.set("fs.viewfs.mounttable.clusterX.link./home", "hdfs://localhost:8020/")
+    val viewFsExpected = Set(new Path("viewfs://clusterX/").getFileSystem(viewFsConf))
+    YarnSparkHadoopUtil.hadoopFSsToAccess(sparkConf, viewFsConf) should be (viewFsExpected)
+
+    // invalid config should not throw NullPointerException
+    val invalidFederationConf = new Configuration()
+    invalidFederationConf.addResource(basicFederationConf)
+    invalidFederationConf.unset("dfs.namenode.rpc-address.ns2")
+    val invalidFederationExpected = Set(
+      new Path("hdfs://localhost:8020").getFileSystem(invalidFederationConf))
+    val invalidFederationResult = YarnSparkHadoopUtil.hadoopFSsToAccess(
+      sparkConf, invalidFederationConf)
+    invalidFederationResult should be (invalidFederationExpected)
+
+    // no namespaces defined, ie. old case
+    val noFederationConf = new Configuration()
+    noFederationConf.set("fs.defaultFS", "hdfs://localhost:8020")
+    val noFederationExpected = Set(
+      new Path("hdfs://localhost:8020").getFileSystem(noFederationConf))
+    val noFederationResult = YarnSparkHadoopUtil.hadoopFSsToAccess(sparkConf, noFederationConf)
+    noFederationResult should be (noFederationExpected)
+
+    // federation and HA enabled
+    val federationAndHAConf = new Configuration()
+    federationAndHAConf.set("fs.defaultFS", "hdfs://clusterXHA")
+    federationAndHAConf.set("dfs.nameservices", "clusterXHA,clusterYHA")
+    federationAndHAConf.set("dfs.ha.namenodes.clusterXHA", "x-nn1,x-nn2")
+    federationAndHAConf.set("dfs.ha.namenodes.clusterYHA", "y-nn1,y-nn2")
+    federationAndHAConf.set("dfs.namenode.rpc-address.clusterXHA.x-nn1", "localhost:8020")
+    federationAndHAConf.set("dfs.namenode.rpc-address.clusterXHA.x-nn2", "localhost:8021")
+    federationAndHAConf.set("dfs.namenode.rpc-address.clusterYHA.y-nn1", "localhost:8022")
+    federationAndHAConf.set("dfs.namenode.rpc-address.clusterYHA.y-nn2", "localhost:8023")
+    federationAndHAConf.set("dfs.client.failover.proxy.provider.clusterXHA",
+      "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider")
+    federationAndHAConf.set("dfs.client.failover.proxy.provider.clusterYHA",
+      "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider")
+
+    val federationAndHAExpected = Set(
+      new Path("hdfs://clusterXHA").getFileSystem(federationAndHAConf),
+      new Path("hdfs://clusterYHA").getFileSystem(federationAndHAConf))
+    val federationAndHAResult = YarnSparkHadoopUtil.hadoopFSsToAccess(
+      sparkConf, federationAndHAConf)
+    federationAndHAResult should be (federationAndHAExpected)
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message