spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From pwend...@apache.org
Subject git commit: SPARK-1337: Application web UI garbage collects newest stages
Date Fri, 04 Apr 2014 05:34:52 GMT
Repository: spark
Updated Branches:
  refs/heads/branch-0.9 d9c7a808c -> 7f727cf97


SPARK-1337: Application web UI garbage collects newest stages

Simple fix...

Author: Patrick Wendell <pwendell@gmail.com>

Closes #320 from pwendell/stage-clean-up and squashes the following commits:

29be62e [Patrick Wendell] SPARK-1337: Application web UI garbage collects newest stages instead
old ones
(cherry picked from commit ee6e9e7d863022304ac9ced405b353b63accb6ab)

Conflicts:

	core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala
	core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/7f727cf9
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/7f727cf9
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/7f727cf9

Branch: refs/heads/branch-0.9
Commit: 7f727cf971a63d8d7217c8e1fca8196f80ece4f5
Parents: d9c7a80
Author: Patrick Wendell <pwendell@gmail.com>
Authored: Thu Apr 3 22:13:56 2014 -0700
Committer: Patrick Wendell <pwendell@gmail.com>
Committed: Thu Apr 3 22:34:21 2014 -0700

----------------------------------------------------------------------
 .../spark/ui/jobs/JobProgressListener.scala     |  8 ++--
 .../ui/jobs/JobProgressListenerSuite.scala      | 40 ++++++++++++++++++--
 2 files changed, 41 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/7f727cf9/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala
index 858a10c..369a7a5 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala
@@ -74,8 +74,8 @@ private[spark] class JobProgressListener(val sc: SparkContext) extends SparkList
   /** If stages is too large, remove and garbage collect old stages */
   def trimIfNecessary(stages: ListBuffer[StageInfo]) = synchronized {
     if (stages.size > RETAINED_STAGES) {
-      val toRemove = RETAINED_STAGES / 10
-      stages.takeRight(toRemove).foreach( s => {
+      val toRemove = math.max(RETAINED_STAGES / 10, 1)
+      stages.take(toRemove).foreach { s =>
         stageIdToTaskInfos.remove(s.stageId)
         stageIdToTime.remove(s.stageId)
         stageIdToShuffleRead.remove(s.stageId)
@@ -87,8 +87,8 @@ private[spark] class JobProgressListener(val sc: SparkContext) extends SparkList
         stageIdToTasksFailed.remove(s.stageId)
         stageIdToPool.remove(s.stageId)
         if (stageIdToDescription.contains(s.stageId)) {stageIdToDescription.remove(s.stageId)}
-      })
-      stages.trimEnd(toRemove)
+      }
+      stages.trimStart(toRemove)
     }
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/7f727cf9/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala b/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala
index 67a57a0..348fbe4 100644
--- a/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala
@@ -17,13 +17,47 @@
 
 package org.apache.spark.ui.jobs
 
+import scala.collection.mutable.Buffer
+
 import org.scalatest.FunSuite
-import org.apache.spark.scheduler._
+import org.scalatest.matchers.ShouldMatchers
+
 import org.apache.spark.{LocalSparkContext, SparkContext, Success}
-import org.apache.spark.scheduler.SparkListenerTaskStart
 import org.apache.spark.executor.{ShuffleReadMetrics, TaskMetrics}
+import org.apache.spark.scheduler._
+import org.apache.spark.rdd.EmptyRDD
+
+class JobProgressListenerSuite extends FunSuite with LocalSparkContext with ShouldMatchers
{
+  test("test LRU eviction of stages") {
+    System.setProperty("spark.ui.retainedStages", 5.toString)
+    val sc = new SparkContext("local", "test")
+    val listener = new JobProgressListener(sc)
+
+    def createStageStartEvent(stageId: Int) = {
+      val stage = new Stage(stageId, new EmptyRDD(sc), 0, None, List(), 0, None)
+      val stageInfo = new StageInfo(stage, Buffer())
+      SparkListenerStageSubmitted(stageInfo, null)
+    }
+
+    def createStageEndEvent(stageId: Int) = {
+      val stage = new Stage(stageId, new EmptyRDD(sc), 0, None, List(), 0, None)
+      val stageInfo = new StageInfo(stage, Buffer())
+      SparkListenerStageCompleted(stageInfo)
+    }
+
+    for (i <- 1 to 50) {
+      listener.onStageSubmitted(createStageStartEvent(i))
+      listener.onStageCompleted(createStageEndEvent(i))
+    }
+
+    listener.completedStages.size should be (5)
+    listener.completedStages.filter(_.stageId == 50).size should be (1)
+    listener.completedStages.filter(_.stageId == 49).size should be (1)
+    listener.completedStages.filter(_.stageId == 48).size should be (1)
+    listener.completedStages.filter(_.stageId == 47).size should be (1)
+    listener.completedStages.filter(_.stageId == 46).size should be (1)
+  }
 
-class JobProgressListenerSuite extends FunSuite with LocalSparkContext {
   test("test executor id to summary") {
     val sc = new SparkContext("local", "test")
     val listener = new JobProgressListener(sc)


Mime
View raw message