spark-reviews mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gatorsmile <...@git.apache.org>
Subject [GitHub] spark pull request #18887: [SPARK-20642][core] Store FsHistoryProvider listi...
Date Mon, 18 Sep 2017 08:14:53 GMT
Github user gatorsmile commented on a diff in the pull request:

    https://github.com/apache/spark/pull/18887#discussion_r139348395
  
    --- Diff: core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
---
    @@ -742,53 +703,150 @@ private[history] object FsHistoryProvider {
       private val APPL_END_EVENT_PREFIX = "{\"Event\":\"SparkListenerApplicationEnd\""
     
       private val LOG_START_EVENT_PREFIX = "{\"Event\":\"SparkListenerLogStart\""
    +
    +  /**
    +   * Current version of the data written to the listing database. When opening an existing
    +   * db, if the version does not match this value, the FsHistoryProvider will throw away
    +   * all data and re-generate the listing data from the event logs.
    +   */
    +  private val CURRENT_LISTING_VERSION = 1L
     }
     
     /**
    - * Application attempt information.
    - *
    - * @param logPath path to the log file, or, for a legacy log, its directory
    - * @param name application name
    - * @param appId application ID
    - * @param attemptId optional attempt ID
    - * @param startTime start time (from playback)
    - * @param endTime end time (from playback). -1 if the application is incomplete.
    - * @param lastUpdated the modification time of the log file when this entry was built
by replaying
    - *                    the history.
    - * @param sparkUser user running the application
    - * @param completed flag to indicate whether or not the application has completed.
    - * @param fileSize the size of the log file the last time the file was scanned for changes
    + * A KVStoreSerializer that provides Scala types serialization too, and uses the same
options as
    + * the API serializer.
      */
    -private class FsApplicationAttemptInfo(
    +private class KVStoreScalaSerializer extends KVStoreSerializer {
    +
    +  mapper.registerModule(DefaultScalaModule)
    +  mapper.setSerializationInclusion(JsonInclude.Include.NON_NULL)
    +  mapper.setDateFormat(v1.JacksonMessageWriter.makeISODateFormat)
    +
    +}
    +
    +private[history] case class KVStoreMetadata(
    +  version: Long,
    +  logDir: String)
    +
    +private[history] case class LogInfo(
    +  @KVIndexParam logPath: String,
    +  fileSize: Long)
    +
    +private[history] class AttemptInfoWrapper(
    +    val info: v1.ApplicationAttemptInfo,
         val logPath: String,
    -    val name: String,
    -    val appId: String,
    -    attemptId: Option[String],
    -    startTime: Long,
    -    endTime: Long,
    -    lastUpdated: Long,
    -    sparkUser: String,
    -    completed: Boolean,
    -    val fileSize: Long,
    -    appSparkVersion: String)
    -  extends ApplicationAttemptInfo(
    -      attemptId, startTime, endTime, lastUpdated, sparkUser, completed, appSparkVersion)
{
    -
    -  /** extend the superclass string value with the extra attributes of this class */
    -  override def toString: String = {
    -    s"FsApplicationAttemptInfo($name, $appId," +
    -      s" ${super.toString}, source=$logPath, size=$fileSize"
    +    val fileSize: Long) {
    +
    +  def toAppAttemptInfo(): ApplicationAttemptInfo = {
    +    ApplicationAttemptInfo(info.attemptId, info.startTime.getTime(),
    +      info.endTime.getTime(), info.lastUpdated.getTime(), info.sparkUser,
    +      info.completed, info.appSparkVersion)
       }
    +
     }
     
    -/**
    - * Application history information
    - * @param id application ID
    - * @param name application name
    - * @param attempts list of attempts, most recent first.
    - */
    -private class FsApplicationHistoryInfo(
    -    id: String,
    -    override val name: String,
    -    override val attempts: List[FsApplicationAttemptInfo])
    -  extends ApplicationHistoryInfo(id, name, attempts)
    +private[history] class ApplicationInfoWrapper(
    +    val info: v1.ApplicationInfo,
    +    val attempts: List[AttemptInfoWrapper]) {
    +
    +  @JsonIgnore @KVIndexParam
    +  def id: String = info.id
    +
    +  @JsonIgnore @KVIndexParam("endTime")
    +  def endTime(): Long = attempts.head.info.endTime.getTime()
    +
    +  @JsonIgnore @KVIndexParam("oldestAttempt")
    +  def oldestAttempt(): Long = attempts.map(_.info.lastUpdated.getTime()).min
    +
    +  def toAppHistoryInfo(): ApplicationHistoryInfo = {
    +    ApplicationHistoryInfo(info.id, info.name, attempts.map(_.toAppAttemptInfo()))
    +  }
    +
    +  def toApiInfo(): v1.ApplicationInfo = {
    +    new v1.ApplicationInfo(info.id, info.name, info.coresGranted, info.maxCores,
    +      info.coresPerExecutor, info.memoryPerExecutorMB, attempts.map(_.info))
    +  }
    +
    +}
    +
    +private[history] class AppListingListener(log: FileStatus, clock: Clock) extends SparkListener
{
    +
    +  private val app = new MutableApplicationInfo()
    +  private val attempt = new MutableAttemptInfo(log.getPath().getName(), log.getLen())
    +
    +  override def onApplicationStart(event: SparkListenerApplicationStart): Unit = {
    +    app.id = event.appId.orNull
    --- End diff --
    
    Based on the existing code, it sounds like we need to change it to `event.appId.getOrElse(log.getPath().getName())`?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Mime
View raw message