spark-reviews mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From vanzin <...@git.apache.org>
Subject [GitHub] spark pull request #18887: [SPARK-20642][core] Store FsHistoryProvider listi...
Date Tue, 12 Sep 2017 23:14:59 GMT
Github user vanzin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/18887#discussion_r138490176
  
    --- Diff: core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
---
    @@ -422,208 +455,101 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock:
Clock)
           }
         }
     
    -    applications.get(appId) match {
    -      case Some(appInfo) =>
    -        try {
    -          // If no attempt is specified, or there is no attemptId for attempts, return
all attempts
    -          appInfo.attempts.filter { attempt =>
    -            attempt.attemptId.isEmpty || attemptId.isEmpty || attempt.attemptId.get ==
attemptId.get
    -          }.foreach { attempt =>
    -            val logPath = new Path(logDir, attempt.logPath)
    -            zipFileToStream(logPath, attempt.logPath, zipStream)
    -          }
    -        } finally {
    -          zipStream.close()
    +    val app = try {
    +      load(appId)
    +    } catch {
    +      case _: NoSuchElementException =>
    +        throw new SparkException(s"Logs for $appId not found.")
    +    }
    +
    +    try {
    +      // If no attempt is specified, or there is no attemptId for attempts, return all
attempts
    +      attemptId
    +        .map { id => app.attempts.filter(_.info.attemptId == Some(id)) }
    +        .getOrElse(app.attempts)
    +        .map(_.logPath)
    +        .foreach { log =>
    +          zipFileToStream(new Path(logDir, log), log, zipStream)
             }
    -      case None => throw new SparkException(s"Logs for $appId not found.")
    +    } finally {
    +      zipStream.close()
         }
       }
     
       /**
    -   * Replay the log files in the list and merge the list of old applications with new
ones
    +   * Replay the given log file, saving the application in the listing db.
        */
       protected def mergeApplicationListing(fileStatus: FileStatus): Unit = {
    -    val newAttempts = try {
    -      val eventsFilter: ReplayEventsFilter = { eventString =>
    -        eventString.startsWith(APPL_START_EVENT_PREFIX) ||
    -          eventString.startsWith(APPL_END_EVENT_PREFIX) ||
    -          eventString.startsWith(LOG_START_EVENT_PREFIX)
    -      }
    -
    -      val logPath = fileStatus.getPath()
    -      val appCompleted = isApplicationCompleted(fileStatus)
    -
    -      // Use loading time as lastUpdated since some filesystems don't update modifiedTime
    -      // each time file is updated. However use modifiedTime for completed jobs so lastUpdated
    -      // won't change whenever HistoryServer restarts and reloads the file.
    -      val lastUpdated = if (appCompleted) fileStatus.getModificationTime else clock.getTimeMillis()
    --- End diff --
    
    Yes, the behavior is the same. It's now handled in `AppListingListener` (`onApplicationStart`
and `onApplicationEnd` callbacks).
    
    I don't see an explicit test for this behavior.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Mime
View raw message