spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From van...@apache.org
Subject spark git commit: [SPARK-21377][YARN] Make jars specify with --jars/--packages load-able in AM's credential renwer
Date Mon, 17 Jul 2017 20:11:35 GMT
Repository: spark
Updated Branches:
  refs/heads/master 0e07a29cf -> 53465075c


[SPARK-21377][YARN] Make jars specify with --jars/--packages load-able in AM's credential
renwer

## What changes were proposed in this pull request?

In this issue we have a long running Spark application with secure HBase, which requires `HBaseCredentialProvider`
to get tokens periodically, we specify HBase related jars with `--packages`, but these dependencies
are not added into AM classpath, so when `HBaseCredentialProvider` tries to initialize HBase
connections to get tokens, it will be failed.

Currently because jars specified with `--jars` or `--packages` are not added into AM classpath,
the only way to extend AM classpath is to use "spark.driver.extraClassPath" which supposed
to be used in yarn cluster mode.

So in this fix, we proposed to use/reuse a classloader for `AMCredentialRenewer` to acquire
new tokens.

Also in this patch, we fixed AM cannot get tokens from HDFS issue, it is because FileSystem
is gotten before kerberos logged, so using this FS to get tokens will throw exception.

## How was this patch tested?

Manual verification.

Author: jerryshao <sshao@hortonworks.com>

Closes #18616 from jerryshao/SPARK-21377.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/53465075
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/53465075
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/53465075

Branch: refs/heads/master
Commit: 53465075c5d7bf2a52b6af0d682219285ee219c6
Parents: 0e07a29
Author: jerryshao <sshao@hortonworks.com>
Authored: Mon Jul 17 13:11:30 2017 -0700
Committer: Marcelo Vanzin <vanzin@cloudera.com>
Committed: Mon Jul 17 13:11:30 2017 -0700

----------------------------------------------------------------------
 .../spark/deploy/yarn/ApplicationMaster.scala   | 59 +++++++++++++-------
 1 file changed, 38 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/53465075/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
----------------------------------------------------------------------
diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
index 4868180..ce290c3 100644
--- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
+++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
@@ -90,6 +90,23 @@ private[spark] class ApplicationMaster(
   @volatile private var reporterThread: Thread = _
   @volatile private var allocator: YarnAllocator = _
 
+  private val userClassLoader = {
+    val classpath = Client.getUserClasspath(sparkConf)
+    val urls = classpath.map { entry =>
+      new URL("file:" + new File(entry.getPath()).getAbsolutePath())
+    }
+
+    if (isClusterMode) {
+      if (Client.isUserClassPathFirst(sparkConf, isDriver = true)) {
+        new ChildFirstURLClassLoader(urls, Utils.getContextOrSparkClassLoader)
+      } else {
+        new MutableURLClassLoader(urls, Utils.getContextOrSparkClassLoader)
+      }
+    } else {
+      new MutableURLClassLoader(urls, Utils.getContextOrSparkClassLoader)
+    }
+  }
+
   // Lock for controlling the allocator (heartbeat) thread.
   private val allocatorLock = new Object()
 
@@ -242,16 +259,27 @@ private[spark] class ApplicationMaster(
 
       // If the credentials file config is present, we must periodically renew tokens. So
create
       // a new AMDelegationTokenRenewer
-      if (sparkConf.contains(CREDENTIALS_FILE_PATH.key)) {
-        // If a principal and keytab have been set, use that to create new credentials for
executors
-        // periodically
-        val credentialManager = new YARNHadoopDelegationTokenManager(
-          sparkConf,
-          yarnConf,
-          YarnSparkHadoopUtil.get.hadoopFSsToAccess(sparkConf, yarnConf))
-
-        val credentialRenewer = new AMCredentialRenewer(sparkConf, yarnConf, credentialManager)
-        credentialRenewer.scheduleLoginFromKeytab()
+      if (sparkConf.contains(CREDENTIALS_FILE_PATH)) {
+        // Start a short-lived thread for AMCredentialRenewer, the only purpose is to set
the
+        // classloader so that main jar and secondary jars could be used by AMCredentialRenewer.
+        val credentialRenewerThread = new Thread {
+          setName("AMCredentialRenewerStarter")
+          setContextClassLoader(userClassLoader)
+
+          override def run(): Unit = {
+            val credentialManager = new YARNHadoopDelegationTokenManager(
+              sparkConf,
+              yarnConf,
+              YarnSparkHadoopUtil.get.hadoopFSsToAccess(sparkConf, yarnConf))
+
+            val credentialRenewer =
+              new AMCredentialRenewer(sparkConf, yarnConf, credentialManager)
+            credentialRenewer.scheduleLoginFromKeytab()
+          }
+        }
+
+        credentialRenewerThread.start()
+        credentialRenewerThread.join()
       }
 
       if (isClusterMode) {
@@ -609,17 +637,6 @@ private[spark] class ApplicationMaster(
   private def startUserApplication(): Thread = {
     logInfo("Starting the user application in a separate Thread")
 
-    val classpath = Client.getUserClasspath(sparkConf)
-    val urls = classpath.map { entry =>
-      new URL("file:" + new File(entry.getPath()).getAbsolutePath())
-    }
-    val userClassLoader =
-      if (Client.isUserClassPathFirst(sparkConf, isDriver = true)) {
-        new ChildFirstURLClassLoader(urls, Utils.getContextOrSparkClassLoader)
-      } else {
-        new MutableURLClassLoader(urls, Utils.getContextOrSparkClassLoader)
-      }
-
     var userArgs = args.userArgs
     if (args.primaryPyFile != null && args.primaryPyFile.endsWith(".py")) {
       // When running pyspark, the app is run using PythonRunner. The second argument is
the list


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message