spark-reviews mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gatorsmile <...@git.apache.org>
Subject [GitHub] spark pull request #18887: [SPARK-20642][core] Store FsHistoryProvider listi...
Date Mon, 18 Sep 2017 08:14:53 GMT
Github user gatorsmile commented on a diff in the pull request:

    https://github.com/apache/spark/pull/18887#discussion_r139358597
  
    --- Diff: core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
---
    @@ -720,19 +633,67 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock:
Clock)
           appId: String,
           attemptId: Option[String],
           prevFileSize: Long)(): Boolean = {
    -    lookup(appId, attemptId) match {
    -      case None =>
    -        logDebug(s"Application Attempt $appId/$attemptId not found")
    -        false
    -      case Some(latest) =>
    -        prevFileSize < latest.fileSize
    +    try {
    +      val attempt = getAttempt(appId, attemptId)
    +      val logPath = fs.makeQualified(new Path(logDir, attempt.logPath))
    +      recordedFileSize(logPath) > prevFileSize
    +    } catch {
    +      case _: NoSuchElementException => false
    +    }
    +  }
    +
    +  /**
    +   * Return the last known size of the given event log, recorded the last time the file
    +   * system scanner detected a change in the file.
    +   */
    +  private def recordedFileSize(log: Path): Long = {
    +    try {
    +      listing.read(classOf[LogInfo], log.toString()).fileSize
    +    } catch {
    +      case _: NoSuchElementException => 0L
         }
       }
    +
    +  private def load(appId: String): ApplicationInfoWrapper = {
    +    listing.read(classOf[ApplicationInfoWrapper], appId)
    +  }
    +
    +  /**
    +   * Write the app's information to the given store. Serialized to avoid the (notedly
rare) case
    +   * where two threads are processing separate attempts of the same application.
    +   */
    +  private def addListing(app: ApplicationInfoWrapper): Unit = listing.synchronized {
    +    val attempt = app.attempts.head
    --- End diff --
    
    Why `.head`?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Mime
View raw message