spark-reviews mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From harishreedharan <...@git.apache.org>
Subject [GitHub] spark pull request: [SPARK-5342][YARN] Allow long running Spark ap...
Date Fri, 06 Mar 2015 00:24:01 GMT
Github user harishreedharan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/4688#discussion_r25916219
  
    --- Diff: yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala ---
    @@ -82,6 +102,180 @@ class YarnSparkHadoopUtil extends SparkHadoopUtil {
         if (credentials != null) credentials.getSecretKey(new Text(key)) else null
       }
     
    +  private[spark] override def scheduleLoginFromKeytab(): Unit = {
    +    val principal = System.getenv("SPARK_PRINCIPAL")
    +    val keytab = System.getenv("SPARK_KEYTAB")
    +    if (principal != null) {
    +      val delegationTokenRenewerThread =
    +        new Runnable {
    +          override def run(): Unit = {
    +            if (!loggedInViaKeytab) {
    +              // Keytab is copied by YARN to the working directory of the AM, so full
path is
    +              // not needed.
    +              loggedInUGI = UserGroupInformation.loginUserFromKeytabAndReturnUGI(
    +                principal, keytab)
    +              loggedInViaKeytab = true
    +            }
    +            val nns = getNameNodesToAccess(sparkConf)
    +            val newCredentials = loggedInUGI.getCredentials
    +            obtainTokensForNamenodes(nns, conf, newCredentials)
    +            val remoteFs = FileSystem.get(conf)
    +            val stagingDirPath =
    +              new Path(remoteFs.getHomeDirectory, System.getenv("SPARK_YARN_STAGING_DIR"))
    +            val tokenPathStr = sparkConf.get("spark.yarn.credentials.file")
    +            val tokenPath = new Path(stagingDirPath.toString, tokenPathStr)
    +            val tempTokenPath = new Path(stagingDirPath.toString, tokenPathStr + ".tmp")
    +            val stream = remoteFs.create(tempTokenPath, true)
    +            // Now write this out to HDFS
    +            newCredentials.writeTokenStorageToStream(stream)
    +            stream.hflush()
    +            stream.close()
    +            // HDFS does reads by inodes now, so just doing a rename should be fine.
But I could
    +            // not find a clear explanation of when the blocks on HDFS are deleted. Ideally,
we
    +            // would not need this, but just be defensive to ensure we don't mess up
the
    +            // credentials. So create a file to show that we are currently updating -
if the
    --- End diff --
    
    That was what I planned to do, but I was not entirely sure about the file listing aspect
of things - whether it would affect perf. But you are right - there are too many race conditions
right now. We should avoid it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Mime
View raw message