spark-reviews mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sryza <...@git.apache.org>
Subject [GitHub] spark pull request: [SPARK-4495] Fix memory leak in JobProgressLis...
Date Thu, 20 Nov 2014 06:05:23 GMT
Github user sryza commented on a diff in the pull request:

    https://github.com/apache/spark/pull/3372#discussion_r20628990
  
    --- Diff: core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala ---
    @@ -40,41 +40,108 @@ class JobProgressListener(conf: SparkConf) extends SparkListener
with Logging {
     
       import JobProgressListener._
     
    +  // Define a handful of type aliases so that data structures' types can serve as documentation.
    +  // These type aliases are public because they're used in the types of public fields:
    +
       type JobId = Int
       type StageId = Int
       type StageAttemptId = Int
    +  type PoolName = String
    +  type ExecutorId = String
     
    -  // How many stages to remember
    -  val retainedStages = conf.getInt("spark.ui.retainedStages", DEFAULT_RETAINED_STAGES)
    -  // How many jobs to remember
    -  val retailedJobs = conf.getInt("spark.ui.retainedJobs", DEFAULT_RETAINED_JOBS)
    +  // Define all of our state:
     
    +  // Jobs:
       val activeJobs = new HashMap[JobId, JobUIData]
       val completedJobs = ListBuffer[JobUIData]()
       val failedJobs = ListBuffer[JobUIData]()
       val jobIdToData = new HashMap[JobId, JobUIData]
     
    +  // Stages:
       val activeStages = new HashMap[StageId, StageInfo]
       val completedStages = ListBuffer[StageInfo]()
       val failedStages = ListBuffer[StageInfo]()
       val stageIdToData = new HashMap[(StageId, StageAttemptId), StageUIData]
       val stageIdToInfo = new HashMap[StageId, StageInfo]
    -  
    -  // Number of completed and failed stages, may not actually equal to completedStages.size
and 
    -  // failedStages.size respectively due to completedStage and failedStages only maintain
the latest
    -  // part of the stages, the earlier ones will be removed when there are too many stages
for 
    -  // memory sake.
    +  val poolToActiveStages = HashMap[PoolName, HashMap[StageId, StageInfo]]()
    +  // Total of completed and failed stages that have ever been run.  These may be greater
than
    +  // `completedStages.size` and `failedStages.size` if we have run more stages or jobs
than
    +  // JobProgressListener's retention limits.
       var numCompletedStages = 0
       var numFailedStages = 0
     
    -  // Map from pool name to a hash map (map from stage id to StageInfo).
    -  val poolToActiveStages = HashMap[String, HashMap[Int, StageInfo]]()
    -
    -  val executorIdToBlockManagerId = HashMap[String, BlockManagerId]()
    +  // Misc:
    +  val executorIdToBlockManagerId = HashMap[ExecutorId, BlockManagerId]()
    +  def blockManagerIds = executorIdToBlockManagerId.values.toSeq
     
       var schedulingMode: Option[SchedulingMode] = None
     
    -  def blockManagerIds = executorIdToBlockManagerId.values.toSeq
    +  // To limit the total memory usage of JobProgressListener, we only track information
for a fixed
    +  // number of non-active jobs and stages (there is no limit for active jobs and stages):
    +
    +  val retainedStages = conf.getInt("spark.ui.retainedStages", DEFAULT_RETAINED_STAGES)
    +  val retainedJobs = conf.getInt("spark.ui.retainedJobs", DEFAULT_RETAINED_JOBS)
    +
    +  // We can test for memory leaks by ensuring that collections that track non-active
jobs and
    +  // stages do not grow without bound and that collections for active jobs/stages eventually
become
    +  // empty once Spark is idle.  Let's partition our collections into ones that should
be empty
    +  // once Spark is idle and ones that should have a hard- or soft-limited sizes.
    +  // These methods are used by unit tests, but they're defined here so that people don't
forget to
    +  // update the tests when adding new collections.  Some collections have multiple levels
of
    +  // nesting, etc, so this lets us customize our notion of "size" for each structure:
    +
    +  // These collections should all be empty once Spark is idle (no active stages / jobs):
    +  private[spark] def getSizesOfActiveStateTrackingCollections: Map[String, Int] = {
    +    Map(
    +      "activeStages" -> activeStages.size,
    --- End diff --
    
    The keys in these maps don't seem to be used anywhere, any reason not to just use a list?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Mime
View raw message