spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From van...@apache.org
Subject spark git commit: [SPARK-21376][YARN] Fix yarn client token expire issue when cleaning the staging files in long running scenario
Date Thu, 13 Jul 2017 22:25:44 GMT
Repository: spark
Updated Branches:
  refs/heads/master 5c8edfc4a -> cb8d5cc90


[SPARK-21376][YARN] Fix yarn client token expire issue when cleaning the staging files in
long running scenario

## What changes were proposed in this pull request?

This issue happens in long running application with yarn cluster mode, because yarn#client
doesn't sync token with AM, so it will always keep the initial token, this token may be expired
in the long running scenario, so when yarn#client tries to clean up staging directory after
application finished, it will use this expired token and meet token expire issue.

## How was this patch tested?

Manual verification is secure cluster.

Author: jerryshao <sshao@hortonworks.com>

Closes #18617 from jerryshao/SPARK-21376.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/cb8d5cc9
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/cb8d5cc9
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/cb8d5cc9

Branch: refs/heads/master
Commit: cb8d5cc90ff8d3c991ff33da41b136ab7634f71b
Parents: 5c8edfc
Author: jerryshao <sshao@hortonworks.com>
Authored: Thu Jul 13 15:25:38 2017 -0700
Committer: Marcelo Vanzin <vanzin@cloudera.com>
Committed: Thu Jul 13 15:25:38 2017 -0700

----------------------------------------------------------------------
 .../org/apache/spark/deploy/yarn/Client.scala   | 35 +++++++++++++++-----
 1 file changed, 26 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/cb8d5cc9/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
----------------------------------------------------------------------
diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
index 7caaa91..a5b0e19 100644
--- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
+++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
@@ -21,6 +21,7 @@ import java.io.{File, FileOutputStream, IOException, OutputStreamWriter}
 import java.net.{InetAddress, UnknownHostException, URI}
 import java.nio.ByteBuffer
 import java.nio.charset.StandardCharsets
+import java.security.PrivilegedExceptionAction
 import java.util.{Locale, Properties, UUID}
 import java.util.zip.{ZipEntry, ZipOutputStream}
 
@@ -192,16 +193,32 @@ private[spark] class Client(
    * Cleanup application staging directory.
    */
   private def cleanupStagingDir(appId: ApplicationId): Unit = {
-    val stagingDirPath = new Path(appStagingBaseDir, getAppStagingDir(appId))
-    try {
-      val preserveFiles = sparkConf.get(PRESERVE_STAGING_FILES)
-      val fs = stagingDirPath.getFileSystem(hadoopConf)
-      if (!preserveFiles && fs.delete(stagingDirPath, true)) {
-        logInfo(s"Deleted staging directory $stagingDirPath")
+    if (sparkConf.get(PRESERVE_STAGING_FILES)) {
+      return
+    }
+
+    def cleanupStagingDirInternal(): Unit = {
+      val stagingDirPath = new Path(appStagingBaseDir, getAppStagingDir(appId))
+      try {
+        val fs = stagingDirPath.getFileSystem(hadoopConf)
+        if (fs.delete(stagingDirPath, true)) {
+          logInfo(s"Deleted staging directory $stagingDirPath")
+        }
+      } catch {
+        case ioe: IOException =>
+          logWarning("Failed to cleanup staging dir " + stagingDirPath, ioe)
       }
-    } catch {
-      case ioe: IOException =>
-        logWarning("Failed to cleanup staging dir " + stagingDirPath, ioe)
+    }
+
+    if (isClusterMode && principal != null && keytab != null) {
+      val newUgi = UserGroupInformation.loginUserFromKeytabAndReturnUGI(principal, keytab)
+      newUgi.doAs(new PrivilegedExceptionAction[Unit] {
+        override def run(): Unit = {
+          cleanupStagingDirInternal()
+        }
+      })
+    } else {
+      cleanupStagingDirInternal()
     }
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message