spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sro...@apache.org
Subject spark git commit: [SPARK-9202] capping maximum number of executor&driver information kept in Worker
Date Fri, 31 Jul 2015 19:26:55 GMT
Repository: spark
Updated Branches:
  refs/heads/master a8340fa7d -> c0686668a


[SPARK-9202] capping maximum number of executor&driver information kept in Worker

https://issues.apache.org/jira/browse/SPARK-9202

Author: CodingCat <zhunansjtu@gmail.com>

Closes #7714 from CodingCat/SPARK-9202 and squashes the following commits:

23977fb [CodingCat] add comments about why we don't synchronize finishedExecutors & finishedDrivers
dc9772d [CodingCat] addressing the comments
e125241 [CodingCat] stylistic fix
80bfe52 [CodingCat] fix JsonProtocolSuite
d7d9485 [CodingCat] styistic fix and respect insert ordering
031755f [CodingCat] add license info & stylistic fix
c3b5361 [CodingCat] test cases and docs
c557b3a [CodingCat] applications are fine
9cac751 [CodingCat] application is fine...
ad87ed7 [CodingCat] trimFinishedExecutorsAndDrivers


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/c0686668
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/c0686668
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/c0686668

Branch: refs/heads/master
Commit: c0686668ae6a92b6bb4801a55c3b78aedbee816a
Parents: a8340fa
Author: CodingCat <zhunansjtu@gmail.com>
Authored: Fri Jul 31 20:27:00 2015 +0100
Committer: Sean Owen <sowen@cloudera.com>
Committed: Fri Jul 31 20:27:00 2015 +0100

----------------------------------------------------------------------
 .../org/apache/spark/deploy/worker/Worker.scala | 124 +++++++++++------
 .../spark/deploy/worker/ui/WorkerWebUI.scala    |   4 +-
 .../apache/spark/deploy/DeployTestUtils.scala   |  89 +++++++++++++
 .../apache/spark/deploy/JsonProtocolSuite.scala |  59 ++------
 .../spark/deploy/worker/WorkerSuite.scala       | 133 ++++++++++++++++++-
 docs/configuration.md                           |  14 ++
 6 files changed, 329 insertions(+), 94 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/c0686668/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
index 82e9578..0276c24 100755
--- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
@@ -25,7 +25,7 @@ import java.util.concurrent._
 import java.util.concurrent.{Future => JFuture, ScheduledFuture => JScheduledFuture}
 
 import scala.collection.JavaConversions._
-import scala.collection.mutable.{HashMap, HashSet}
+import scala.collection.mutable.{HashMap, HashSet, LinkedHashMap}
 import scala.concurrent.ExecutionContext
 import scala.util.Random
 import scala.util.control.NonFatal
@@ -115,13 +115,18 @@ private[worker] class Worker(
     }
 
   var workDir: File = null
-  val finishedExecutors = new HashMap[String, ExecutorRunner]
+  val finishedExecutors = new LinkedHashMap[String, ExecutorRunner]
   val drivers = new HashMap[String, DriverRunner]
   val executors = new HashMap[String, ExecutorRunner]
-  val finishedDrivers = new HashMap[String, DriverRunner]
+  val finishedDrivers = new LinkedHashMap[String, DriverRunner]
   val appDirectories = new HashMap[String, Seq[String]]
   val finishedApps = new HashSet[String]
 
+  val retainedExecutors = conf.getInt("spark.worker.ui.retainedExecutors",
+    WorkerWebUI.DEFAULT_RETAINED_EXECUTORS)
+  val retainedDrivers = conf.getInt("spark.worker.ui.retainedDrivers",
+    WorkerWebUI.DEFAULT_RETAINED_DRIVERS)
+
   // The shuffle service is not actually started unless configured.
   private val shuffleService = new ExternalShuffleService(conf, securityMgr)
 
@@ -461,25 +466,7 @@ private[worker] class Worker(
       }
 
     case executorStateChanged @ ExecutorStateChanged(appId, execId, state, message, exitStatus)
=>
-      sendToMaster(executorStateChanged)
-      val fullId = appId + "/" + execId
-      if (ExecutorState.isFinished(state)) {
-        executors.get(fullId) match {
-          case Some(executor) =>
-            logInfo("Executor " + fullId + " finished with state " + state +
-              message.map(" message " + _).getOrElse("") +
-              exitStatus.map(" exitStatus " + _).getOrElse(""))
-            executors -= fullId
-            finishedExecutors(fullId) = executor
-            coresUsed -= executor.cores
-            memoryUsed -= executor.memory
-          case None =>
-            logInfo("Unknown Executor " + fullId + " finished with state " + state +
-              message.map(" message " + _).getOrElse("") +
-              exitStatus.map(" exitStatus " + _).getOrElse(""))
-        }
-        maybeCleanupApplication(appId)
-      }
+      handleExecutorStateChanged(executorStateChanged)
 
     case KillExecutor(masterUrl, appId, execId) =>
       if (masterUrl != activeMasterUrl) {
@@ -523,24 +510,8 @@ private[worker] class Worker(
       }
     }
 
-    case driverStageChanged @ DriverStateChanged(driverId, state, exception) => {
-      state match {
-        case DriverState.ERROR =>
-          logWarning(s"Driver $driverId failed with unrecoverable exception: ${exception.get}")
-        case DriverState.FAILED =>
-          logWarning(s"Driver $driverId exited with failure")
-        case DriverState.FINISHED =>
-          logInfo(s"Driver $driverId exited successfully")
-        case DriverState.KILLED =>
-          logInfo(s"Driver $driverId was killed by user")
-        case _ =>
-          logDebug(s"Driver $driverId changed state to $state")
-      }
-      sendToMaster(driverStageChanged)
-      val driver = drivers.remove(driverId).get
-      finishedDrivers(driverId) = driver
-      memoryUsed -= driver.driverDesc.mem
-      coresUsed -= driver.driverDesc.cores
+    case driverStateChanged @ DriverStateChanged(driverId, state, exception) => {
+      handleDriverStateChanged(driverStateChanged)
     }
 
     case ReregisterWithMaster =>
@@ -614,6 +585,78 @@ private[worker] class Worker(
     webUi.stop()
     metricsSystem.stop()
   }
+
+  private def trimFinishedExecutorsIfNecessary(): Unit = {
+    // do not need to protect with locks since both WorkerPage and Restful server get data
through
+    // thread-safe RpcEndPoint
+    if (finishedExecutors.size > retainedExecutors) {
+      finishedExecutors.take(math.max(finishedExecutors.size / 10, 1)).foreach {
+        case (executorId, _) => finishedExecutors.remove(executorId)
+      }
+    }
+  }
+
+  private def trimFinishedDriversIfNecessary(): Unit = {
+    // do not need to protect with locks since both WorkerPage and Restful server get data
through
+    // thread-safe RpcEndPoint
+    if (finishedDrivers.size > retainedDrivers) {
+      finishedDrivers.take(math.max(finishedDrivers.size / 10, 1)).foreach {
+        case (driverId, _) => finishedDrivers.remove(driverId)
+      }
+    }
+  }
+
+  private[worker] def handleDriverStateChanged(driverStateChanged: DriverStateChanged): Unit
= {
+    val driverId = driverStateChanged.driverId
+    val exception = driverStateChanged.exception
+    val state = driverStateChanged.state
+    state match {
+      case DriverState.ERROR =>
+        logWarning(s"Driver $driverId failed with unrecoverable exception: ${exception.get}")
+      case DriverState.FAILED =>
+        logWarning(s"Driver $driverId exited with failure")
+      case DriverState.FINISHED =>
+        logInfo(s"Driver $driverId exited successfully")
+      case DriverState.KILLED =>
+        logInfo(s"Driver $driverId was killed by user")
+      case _ =>
+        logDebug(s"Driver $driverId changed state to $state")
+    }
+    sendToMaster(driverStateChanged)
+    val driver = drivers.remove(driverId).get
+    finishedDrivers(driverId) = driver
+    trimFinishedDriversIfNecessary()
+    memoryUsed -= driver.driverDesc.mem
+    coresUsed -= driver.driverDesc.cores
+  }
+
+  private[worker] def handleExecutorStateChanged(executorStateChanged: ExecutorStateChanged):
+    Unit = {
+    sendToMaster(executorStateChanged)
+    val state = executorStateChanged.state
+    if (ExecutorState.isFinished(state)) {
+      val appId = executorStateChanged.appId
+      val fullId = appId + "/" + executorStateChanged.execId
+      val message = executorStateChanged.message
+      val exitStatus = executorStateChanged.exitStatus
+      executors.get(fullId) match {
+        case Some(executor) =>
+          logInfo("Executor " + fullId + " finished with state " + state +
+            message.map(" message " + _).getOrElse("") +
+            exitStatus.map(" exitStatus " + _).getOrElse(""))
+          executors -= fullId
+          finishedExecutors(fullId) = executor
+          trimFinishedExecutorsIfNecessary()
+          coresUsed -= executor.cores
+          memoryUsed -= executor.memory
+        case None =>
+          logInfo("Unknown Executor " + fullId + " finished with state " + state +
+            message.map(" message " + _).getOrElse("") +
+            exitStatus.map(" exitStatus " + _).getOrElse(""))
+      }
+      maybeCleanupApplication(appId)
+    }
+  }
 }
 
 private[deploy] object Worker extends Logging {
@@ -669,5 +712,4 @@ private[deploy] object Worker extends Logging {
       cmd
     }
   }
-
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/c0686668/core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerWebUI.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerWebUI.scala b/core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerWebUI.scala
index 334a5b1..709a272 100644
--- a/core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerWebUI.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerWebUI.scala
@@ -53,6 +53,8 @@ class WorkerWebUI(
   }
 }
 
-private[ui] object WorkerWebUI {
+private[worker] object WorkerWebUI {
   val STATIC_RESOURCE_BASE = SparkUI.STATIC_RESOURCE_DIR
+  val DEFAULT_RETAINED_DRIVERS = 1000
+  val DEFAULT_RETAINED_EXECUTORS = 1000
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/c0686668/core/src/test/scala/org/apache/spark/deploy/DeployTestUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/deploy/DeployTestUtils.scala b/core/src/test/scala/org/apache/spark/deploy/DeployTestUtils.scala
new file mode 100644
index 0000000..967aa09
--- /dev/null
+++ b/core/src/test/scala/org/apache/spark/deploy/DeployTestUtils.scala
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy
+
+import java.io.File
+import java.util.Date
+
+import org.apache.spark.deploy.master.{ApplicationInfo, DriverInfo, WorkerInfo}
+import org.apache.spark.deploy.worker.{DriverRunner, ExecutorRunner}
+import org.apache.spark.{SecurityManager, SparkConf}
+
+private[deploy] object DeployTestUtils {
+  def createAppDesc(): ApplicationDescription = {
+    val cmd = new Command("mainClass", List("arg1", "arg2"), Map(), Seq(), Seq(), Seq())
+    new ApplicationDescription("name", Some(4), 1234, cmd, "appUiUrl")
+  }
+
+  def createAppInfo() : ApplicationInfo = {
+    val appInfo = new ApplicationInfo(JsonConstants.appInfoStartTime,
+      "id", createAppDesc(), JsonConstants.submitDate, null, Int.MaxValue)
+    appInfo.endTime = JsonConstants.currTimeInMillis
+    appInfo
+  }
+
+  def createDriverCommand(): Command = new Command(
+    "org.apache.spark.FakeClass", Seq("some arg --and-some options -g foo"),
+    Map(("K1", "V1"), ("K2", "V2")), Seq("cp1", "cp2"), Seq("lp1", "lp2"), Seq("-Dfoo")
+  )
+
+  def createDriverDesc(): DriverDescription =
+    new DriverDescription("hdfs://some-dir/some.jar", 100, 3, false, createDriverCommand())
+
+  def createDriverInfo(): DriverInfo = new DriverInfo(3, "driver-3",
+    createDriverDesc(), new Date())
+
+  def createWorkerInfo(): WorkerInfo = {
+    val workerInfo = new WorkerInfo("id", "host", 8080, 4, 1234, null, 80, "publicAddress")
+    workerInfo.lastHeartbeat = JsonConstants.currTimeInMillis
+    workerInfo
+  }
+
+  def createExecutorRunner(execId: Int): ExecutorRunner = {
+    new ExecutorRunner(
+      "appId",
+      execId,
+      createAppDesc(),
+      4,
+      1234,
+      null,
+      "workerId",
+      "host",
+      123,
+      "publicAddress",
+      new File("sparkHome"),
+      new File("workDir"),
+      "akka://worker",
+      new SparkConf,
+      Seq("localDir"),
+      ExecutorState.RUNNING)
+  }
+
+  def createDriverRunner(driverId: String): DriverRunner = {
+    val conf = new SparkConf()
+    new DriverRunner(
+      conf,
+      driverId,
+      new File("workDir"),
+      new File("sparkHome"),
+      createDriverDesc(),
+      null,
+      "akka://worker",
+      new SecurityManager(conf))
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/c0686668/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala b/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala
index 08529e0..0a9f128 100644
--- a/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala
@@ -17,7 +17,6 @@
 
 package org.apache.spark.deploy
 
-import java.io.File
 import java.util.Date
 
 import com.fasterxml.jackson.core.JsonParseException
@@ -25,12 +24,14 @@ import org.json4s._
 import org.json4s.jackson.JsonMethods
 
 import org.apache.spark.deploy.DeployMessages.{MasterStateResponse, WorkerStateResponse}
-import org.apache.spark.deploy.master.{ApplicationInfo, DriverInfo, RecoveryState, WorkerInfo}
-import org.apache.spark.deploy.worker.{DriverRunner, ExecutorRunner}
-import org.apache.spark.{JsonTestUtils, SecurityManager, SparkConf, SparkFunSuite}
+import org.apache.spark.deploy.master.{ApplicationInfo, RecoveryState}
+import org.apache.spark.deploy.worker.ExecutorRunner
+import org.apache.spark.{JsonTestUtils, SparkFunSuite}
 
 class JsonProtocolSuite extends SparkFunSuite with JsonTestUtils {
 
+  import org.apache.spark.deploy.DeployTestUtils._
+
   test("writeApplicationInfo") {
     val output = JsonProtocol.writeApplicationInfo(createAppInfo())
     assertValidJson(output)
@@ -50,7 +51,7 @@ class JsonProtocolSuite extends SparkFunSuite with JsonTestUtils {
   }
 
   test("writeExecutorRunner") {
-    val output = JsonProtocol.writeExecutorRunner(createExecutorRunner())
+    val output = JsonProtocol.writeExecutorRunner(createExecutorRunner(123))
     assertValidJson(output)
     assertValidDataInJson(output, JsonMethods.parse(JsonConstants.executorRunnerJsonStr))
   }
@@ -77,9 +78,10 @@ class JsonProtocolSuite extends SparkFunSuite with JsonTestUtils {
 
   test("writeWorkerState") {
     val executors = List[ExecutorRunner]()
-    val finishedExecutors = List[ExecutorRunner](createExecutorRunner(), createExecutorRunner())
-    val drivers = List(createDriverRunner())
-    val finishedDrivers = List(createDriverRunner(), createDriverRunner())
+    val finishedExecutors = List[ExecutorRunner](createExecutorRunner(123),
+      createExecutorRunner(123))
+    val drivers = List(createDriverRunner("driverId"))
+    val finishedDrivers = List(createDriverRunner("driverId"), createDriverRunner("driverId"))
     val stateResponse = new WorkerStateResponse("host", 8080, "workerId", executors,
       finishedExecutors, drivers, finishedDrivers, "masterUrl", 4, 1234, 4, 1234, "masterWebUiUrl")
     val output = JsonProtocol.writeWorkerState(stateResponse)
@@ -87,47 +89,6 @@ class JsonProtocolSuite extends SparkFunSuite with JsonTestUtils {
     assertValidDataInJson(output, JsonMethods.parse(JsonConstants.workerStateJsonStr))
   }
 
-  def createAppDesc(): ApplicationDescription = {
-    val cmd = new Command("mainClass", List("arg1", "arg2"), Map(), Seq(), Seq(), Seq())
-    new ApplicationDescription("name", Some(4), 1234, cmd, "appUiUrl")
-  }
-
-  def createAppInfo() : ApplicationInfo = {
-    val appInfo = new ApplicationInfo(JsonConstants.appInfoStartTime,
-      "id", createAppDesc(), JsonConstants.submitDate, null, Int.MaxValue)
-    appInfo.endTime = JsonConstants.currTimeInMillis
-    appInfo
-  }
-
-  def createDriverCommand(): Command = new Command(
-    "org.apache.spark.FakeClass", Seq("some arg --and-some options -g foo"),
-    Map(("K1", "V1"), ("K2", "V2")), Seq("cp1", "cp2"), Seq("lp1", "lp2"), Seq("-Dfoo")
-  )
-
-  def createDriverDesc(): DriverDescription =
-    new DriverDescription("hdfs://some-dir/some.jar", 100, 3, false, createDriverCommand())
-
-  def createDriverInfo(): DriverInfo = new DriverInfo(3, "driver-3",
-    createDriverDesc(), new Date())
-
-  def createWorkerInfo(): WorkerInfo = {
-    val workerInfo = new WorkerInfo("id", "host", 8080, 4, 1234, null, 80, "publicAddress")
-    workerInfo.lastHeartbeat = JsonConstants.currTimeInMillis
-    workerInfo
-  }
-
-  def createExecutorRunner(): ExecutorRunner = {
-    new ExecutorRunner("appId", 123, createAppDesc(), 4, 1234, null, "workerId", "host",
123,
-      "publicAddress", new File("sparkHome"), new File("workDir"), "akka://worker",
-      new SparkConf, Seq("localDir"), ExecutorState.RUNNING)
-  }
-
-  def createDriverRunner(): DriverRunner = {
-    val conf = new SparkConf()
-    new DriverRunner(conf, "driverId", new File("workDir"), new File("sparkHome"),
-      createDriverDesc(), null, "akka://worker", new SecurityManager(conf))
-  }
-
   def assertValidJson(json: JValue) {
     try {
       JsonMethods.parse(JsonMethods.compact(json))

http://git-wip-us.apache.org/repos/asf/spark/blob/c0686668/core/src/test/scala/org/apache/spark/deploy/worker/WorkerSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/deploy/worker/WorkerSuite.scala b/core/src/test/scala/org/apache/spark/deploy/worker/WorkerSuite.scala
index 0f4d3b2..faed4bd 100644
--- a/core/src/test/scala/org/apache/spark/deploy/worker/WorkerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/worker/WorkerSuite.scala
@@ -17,13 +17,18 @@
 
 package org.apache.spark.deploy.worker
 
-import org.apache.spark.{SparkConf, SparkFunSuite}
-import org.apache.spark.deploy.Command
-
 import org.scalatest.Matchers
 
+import org.apache.spark.deploy.DeployMessages.{DriverStateChanged, ExecutorStateChanged}
+import org.apache.spark.deploy.master.DriverState
+import org.apache.spark.deploy.{Command, ExecutorState}
+import org.apache.spark.rpc.{RpcAddress, RpcEnv}
+import org.apache.spark.{SecurityManager, SparkConf, SparkFunSuite}
+
 class WorkerSuite extends SparkFunSuite with Matchers {
 
+  import org.apache.spark.deploy.DeployTestUtils._
+
   def cmd(javaOpts: String*): Command = {
     Command("", Seq.empty, Map.empty, Seq.empty, Seq.empty, Seq(javaOpts : _*))
   }
@@ -56,4 +61,126 @@ class WorkerSuite extends SparkFunSuite with Matchers {
           "-Dspark.ssl.useNodeLocalConf=true", "-Dspark.ssl.opt1=y", "-Dspark.ssl.opt2=z")
 
   }
+
+  test("test clearing of finishedExecutors (small number of executors)") {
+    val conf = new SparkConf()
+    conf.set("spark.worker.ui.retainedExecutors", 2.toString)
+    val rpcEnv = RpcEnv.create("test", "localhost", 12345, conf, new SecurityManager(conf))
+    val worker = new Worker(rpcEnv, 50000, 20, 1234 * 5, Array.fill(1)(RpcAddress("1.2.3.4",
1234)),
+      "sparkWorker1", "Worker", "/tmp", conf, new SecurityManager(conf))
+    // initialize workers
+    for (i <- 0 until 5) {
+      worker.executors += s"app1/$i" -> createExecutorRunner(i)
+    }
+    // initialize ExecutorStateChanged Message
+    worker.handleExecutorStateChanged(
+      ExecutorStateChanged("app1", 0, ExecutorState.EXITED, None, None))
+    assert(worker.finishedExecutors.size === 1)
+    assert(worker.executors.size === 4)
+    for (i <- 1 until 5) {
+      worker.handleExecutorStateChanged(
+        ExecutorStateChanged("app1", i, ExecutorState.EXITED, None, None))
+      assert(worker.finishedExecutors.size === 2)
+      if (i > 1) {
+        assert(!worker.finishedExecutors.contains(s"app1/${i - 2}"))
+      }
+      assert(worker.executors.size === 4 - i)
+    }
+  }
+
+  test("test clearing of finishedExecutors (more executors)") {
+    val conf = new SparkConf()
+    conf.set("spark.worker.ui.retainedExecutors", 30.toString)
+    val rpcEnv = RpcEnv.create("test", "localhost", 12345, conf, new SecurityManager(conf))
+    val worker = new Worker(rpcEnv, 50000, 20, 1234 * 5, Array.fill(1)(RpcAddress("1.2.3.4",
1234)),
+      "sparkWorker1", "Worker", "/tmp", conf, new SecurityManager(conf))
+    // initialize workers
+    for (i <- 0 until 50) {
+      worker.executors += s"app1/$i" -> createExecutorRunner(i)
+    }
+    // initialize ExecutorStateChanged Message
+    worker.handleExecutorStateChanged(
+      ExecutorStateChanged("app1", 0, ExecutorState.EXITED, None, None))
+    assert(worker.finishedExecutors.size === 1)
+    assert(worker.executors.size === 49)
+    for (i <- 1 until 50) {
+      val expectedValue = {
+        if (worker.finishedExecutors.size < 30) {
+          worker.finishedExecutors.size + 1
+        } else {
+          28
+        }
+      }
+      worker.handleExecutorStateChanged(
+        ExecutorStateChanged("app1", i, ExecutorState.EXITED, None, None))
+      if (expectedValue == 28) {
+        for (j <- i - 30 until i - 27) {
+          assert(!worker.finishedExecutors.contains(s"app1/$j"))
+        }
+      }
+      assert(worker.executors.size === 49 - i)
+      assert(worker.finishedExecutors.size === expectedValue)
+    }
+  }
+
+  test("test clearing of finishedDrivers (small number of drivers)") {
+    val conf = new SparkConf()
+    conf.set("spark.worker.ui.retainedDrivers", 2.toString)
+    val rpcEnv = RpcEnv.create("test", "localhost", 12345, conf, new SecurityManager(conf))
+    val worker = new Worker(rpcEnv, 50000, 20, 1234 * 5, Array.fill(1)(RpcAddress("1.2.3.4",
1234)),
+      "sparkWorker1", "Worker", "/tmp", conf, new SecurityManager(conf))
+    // initialize workers
+    for (i <- 0 until 5) {
+      val driverId = s"driverId-$i"
+      worker.drivers += driverId -> createDriverRunner(driverId)
+    }
+    // initialize DriverStateChanged Message
+    worker.handleDriverStateChanged(DriverStateChanged("driverId-0", DriverState.FINISHED,
None))
+    assert(worker.drivers.size === 4)
+    assert(worker.finishedDrivers.size === 1)
+    for (i <- 1 until 5) {
+      val driverId = s"driverId-$i"
+      worker.handleDriverStateChanged(DriverStateChanged(driverId, DriverState.FINISHED,
None))
+      if (i > 1) {
+        assert(!worker.finishedDrivers.contains(s"driverId-${i - 2}"))
+      }
+      assert(worker.drivers.size === 4 - i)
+      assert(worker.finishedDrivers.size === 2)
+    }
+  }
+
+  test("test clearing of finishedDrivers (more drivers)") {
+    val conf = new SparkConf()
+    conf.set("spark.worker.ui.retainedDrivers", 30.toString)
+    val rpcEnv = RpcEnv.create("test", "localhost", 12345, conf, new SecurityManager(conf))
+    val worker = new Worker(rpcEnv, 50000, 20, 1234 * 5, Array.fill(1)(RpcAddress("1.2.3.4",
1234)),
+      "sparkWorker1", "Worker", "/tmp", conf, new SecurityManager(conf))
+    // initialize workers
+    for (i <- 0 until 50) {
+      val driverId = s"driverId-$i"
+      worker.drivers += driverId -> createDriverRunner(driverId)
+    }
+    // initialize DriverStateChanged Message
+    worker.handleDriverStateChanged(DriverStateChanged("driverId-0", DriverState.FINISHED,
None))
+    assert(worker.finishedDrivers.size === 1)
+    assert(worker.drivers.size === 49)
+    for (i <- 1 until 50) {
+      val expectedValue = {
+        if (worker.finishedDrivers.size < 30) {
+          worker.finishedDrivers.size + 1
+        } else {
+          28
+        }
+      }
+      val driverId = s"driverId-$i"
+      worker.handleDriverStateChanged(DriverStateChanged(driverId, DriverState.FINISHED,
None))
+      if (expectedValue == 28) {
+        for (j <- i - 30 until i - 27) {
+          assert(!worker.finishedDrivers.contains(s"driverId-$j"))
+        }
+      }
+      assert(worker.drivers.size === 49 - i)
+      assert(worker.finishedDrivers.size === expectedValue)
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/c0686668/docs/configuration.md
----------------------------------------------------------------------
diff --git a/docs/configuration.md b/docs/configuration.md
index fd23613..24b6063 100644
--- a/docs/configuration.md
+++ b/docs/configuration.md
@@ -557,6 +557,20 @@ Apart from these, the following properties are also available, and may
be useful
     collecting.
   </td>
 </tr>
+<tr>
+  <td><code>spark.worker.ui.retainedExecutors</code></td>
+  <td>1000</td>
+  <td>
+    How many finished executors the Spark UI and status APIs remember before garbage collecting.
+  </td>
+</tr>
+<tr>
+  <td><code>spark.worker.ui.retainedDrivers</code></td>
+  <td>1000</td>
+  <td>
+    How many finished drivers the Spark UI and status APIs remember before garbage collecting.
+  </td>
+</tr>
 </table>
 
 #### Compression and Serialization


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message