spark-reviews mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From vanzin <...@git.apache.org>
Subject [GitHub] spark pull request #22504: [SPARK-25118][Submit] Persist Driver Logs in Clie...
Date Fri, 02 Nov 2018 18:28:18 GMT
Github user vanzin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22504#discussion_r230463888
  
    --- Diff: core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
---
    @@ -812,18 +821,74 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock:
Clock)
           .reverse()
           .first(maxTime)
           .asScala
    +      .filter(l => l.logType == LogType.EventLogs)
           .toList
         stale.foreach { log =>
           if (log.appId.isEmpty) {
             logInfo(s"Deleting invalid / corrupt event log ${log.logPath}")
    -        deleteLog(new Path(log.logPath))
    +        deleteLog(fs, new Path(log.logPath))
             listing.delete(classOf[LogInfo], log.logPath)
           }
         }
         // Clean the blacklist from the expired entries.
         clearBlacklist(CLEAN_INTERVAL_S)
       }
     
    +  /**
    +   * Delete driver logs from the configured spark dfs dir that exceed the configured
max age
    +   */
    +  private[history] def cleanDriverLogs(): Unit = Utils.tryLog {
    +    val driverLogDir = conf.get(DRIVER_LOG_DFS_DIR).get
    +    val driverLogFs = new Path(driverLogDir).getFileSystem(hadoopConf)
    +    val currentTime = clock.getTimeMillis()
    +    val maxTime = currentTime - conf.get(MAX_DRIVER_LOG_AGE_S) * 1000
    +    val logFiles = driverLogFs.listLocatedStatus(new Path(driverLogDir))
    +    while (logFiles.hasNext()) {
    +      val f = logFiles.next()
    +      // Do not rely on 'modtime' as it is not updated for all filesystems when files
are written to
    +      val deleteFile =
    +        try {
    +          val info = listing.read(classOf[LogInfo], f.getPath().toString())
    +          // Update the lastprocessedtime of file if it's length or modification time
has changed
    +          if (info.fileSize < f.getLen() || info.lastProcessed < f.getModificationTime())
{
    +            listing.write(
    +              info.copy(lastProcessed = currentTime, fileSize = f.getLen()))
    +            false
    +          } else if (info.lastProcessed > maxTime) {
    +            false
    +          } else {
    +            true
    +          }
    +        } catch {
    +          case e: NoSuchElementException =>
    +            // For every new driver log file discovered, create a new entry in listing
    +            listing.write(LogInfo(f.getPath().toString(), currentTime, LogType.DriverLogs,
None,
    +              None, f.getLen()))
    +          false
    +        }
    +      if (deleteFile) {
    +        logInfo(s"Deleting expired driver log for: ${f.getPath().getName()}")
    +        listing.delete(classOf[LogInfo], f.getPath().toString())
    +        deleteLog(driverLogFs, f.getPath())
    +      }
    +    }
    +
    +    // Delete driver log file entries that exceed the configured max age and
    +    // may have been deleted on filesystem externally.
    +    val stale = listing.view(classOf[LogInfo])
    +      .index("lastProcessed")
    +      .reverse()
    +      .first(maxTime)
    +      .asScala
    +      .filter(i => i.logType == LogType.DriverLogs)
    --- End diff --
    
    `.filter { i => ... }`


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Mime
View raw message