spark-reviews mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From vanzin <...@git.apache.org>
Subject [GitHub] spark pull request: [SPARK-5342][YARN] Allow long running Spark ap...
Date Thu, 26 Feb 2015 23:33:21 GMT
Github user vanzin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/4688#discussion_r25476432
  
    --- Diff: yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala ---
    @@ -82,6 +93,102 @@ class YarnSparkHadoopUtil extends SparkHadoopUtil {
         if (credentials != null) credentials.getSecretKey(new Text(key)) else null
       }
     
    +  override def setPrincipalAndKeytabForLogin(principal: String, keytab: String): Unit
= {
    +    loginPrincipal = Option(principal)
    +    keytabFile = Option(keytab)
    +  }
    +
    +  private[spark] override def scheduleLoginFromKeytab(
    +    callback: (SerializableBuffer)  => Unit): Unit = {
    +
    +    loginPrincipal match {
    +      case Some(principal) =>
    +        val keytab = keytabFile.get
    +        val remoteFs = FileSystem.get(conf)
    +        val remoteKeytabPath = new Path(
    +          remoteFs.getHomeDirectory, System.getenv("SPARK_STAGING_DIR") + Path.SEPARATOR
+ keytab)
    +        val localFS = FileSystem.getLocal(conf)
    +        // At this point, SparkEnv is likely no initialized, so create a dir, put the
keytab there.
    +        val tempDir = Utils.createTempDir()
    +        val localURI = new URI(tempDir.getAbsolutePath + Path.SEPARATOR + keytab)
    +        val qualifiedURI = new  URI(localFS.makeQualified(new Path(localURI)).toString)
    +        FileUtil.copy(
    +          remoteFs, remoteKeytabPath, localFS, new Path(qualifiedURI), false, false,
conf)
    +        // Get the current credentials, find out when they expire.
    +        val creds = UserGroupInformation.getCurrentUser.getCredentials
    +        val credStream = new ByteArrayOutputStream()
    +        creds.writeTokenStorageToStream(new DataOutputStream(credStream))
    +        val in = new DataInputStream(new ByteArrayInputStream(credStream.toByteArray))
    +        val tokenIdentifier = new DelegationTokenIdentifier()
    +        tokenIdentifier.readFields(in)
    +        val timeToRenewal = (0.6 * (tokenIdentifier.getMaxDate - System.currentTimeMillis())).toLong
    +        Executors.newSingleThreadScheduledExecutor(new ThreadFactory {
    +          override def newThread(r: Runnable): Thread = {
    +            val t = new Thread(r)
    +            t.setName("Delegation Token Refresh Thread")
    +            t.setDaemon(true)
    +            t
    +          }
    +        }).scheduleWithFixedDelay(new Runnable {
    +          override def run(): Unit = {
    +            if (!loggedInViaKeytab.get()) {
    --- End diff --
    
    Since this is a single-threaded executor, do you really need the atomic boolean?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Mime
View raw message