spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sro...@apache.org
Subject spark git commit: [SPARK-17671][WEBUI] Spark 2.0 history server summary page is slow even set spark.history.ui.maxApplications
Date Tue, 04 Oct 2016 09:29:29 GMT
Repository: spark
Updated Branches:
  refs/heads/master 126baa8d3 -> 8e8de0073


[SPARK-17671][WEBUI] Spark 2.0 history server summary page is slow even set spark.history.ui.maxApplications

## What changes were proposed in this pull request?

Return Iterator of applications internally in history server, for consistency and performance.
See https://github.com/apache/spark/pull/15248 for some back-story.

The code called by and calling HistoryServer.getApplicationList wants an Iterator, but this
method materializes an Iterable, which potentially causes a performance problem. It's simpler
too to make this internal method also pass through an Iterator.

## How was this patch tested?

Existing tests.

Author: Sean Owen <sowen@cloudera.com>

Closes #15321 from srowen/SPARK-17671.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/8e8de007
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/8e8de007
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/8e8de007

Branch: refs/heads/master
Commit: 8e8de0073d71bb00baeb24c612d7841b6274f652
Parents: 126baa8
Author: Sean Owen <sowen@cloudera.com>
Authored: Tue Oct 4 10:29:22 2016 +0100
Committer: Sean Owen <sowen@cloudera.com>
Committed: Tue Oct 4 10:29:22 2016 +0100

----------------------------------------------------------------------
 .../history/ApplicationHistoryProvider.scala    |  2 +-
 .../deploy/history/FsHistoryProvider.scala      |  2 +-
 .../spark/deploy/history/HistoryPage.scala      |  5 +--
 .../spark/deploy/history/HistoryServer.scala    |  4 +--
 .../status/api/v1/ApplicationListResource.scala | 38 +++++++-------------
 .../deploy/history/HistoryServerSuite.scala     |  4 +--
 project/MimaExcludes.scala                      |  2 ++
 7 files changed, 22 insertions(+), 35 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/8e8de007/core/src/main/scala/org/apache/spark/deploy/history/ApplicationHistoryProvider.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/history/ApplicationHistoryProvider.scala
b/core/src/main/scala/org/apache/spark/deploy/history/ApplicationHistoryProvider.scala
index ba42b48..ad7a097 100644
--- a/core/src/main/scala/org/apache/spark/deploy/history/ApplicationHistoryProvider.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/history/ApplicationHistoryProvider.scala
@@ -77,7 +77,7 @@ private[history] abstract class ApplicationHistoryProvider {
    *
    * @return List of all know applications.
    */
-  def getListing(): Iterable[ApplicationHistoryInfo]
+  def getListing(): Iterator[ApplicationHistoryInfo]
 
   /**
    * Returns the Spark UI for a specific application.

http://git-wip-us.apache.org/repos/asf/spark/blob/8e8de007/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
index c5740e4..3c2d169 100644
--- a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
@@ -222,7 +222,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
     }
   }
 
-  override def getListing(): Iterable[FsApplicationHistoryInfo] = applications.values
+  override def getListing(): Iterator[FsApplicationHistoryInfo] = applications.values.iterator
 
   override def getApplicationInfo(appId: String): Option[FsApplicationHistoryInfo] = {
     applications.get(appId)

http://git-wip-us.apache.org/repos/asf/spark/blob/8e8de007/core/src/main/scala/org/apache/spark/deploy/history/HistoryPage.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/history/HistoryPage.scala b/core/src/main/scala/org/apache/spark/deploy/history/HistoryPage.scala
index b4f5a61..95b7222 100644
--- a/core/src/main/scala/org/apache/spark/deploy/history/HistoryPage.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/history/HistoryPage.scala
@@ -29,10 +29,7 @@ private[history] class HistoryPage(parent: HistoryServer) extends WebUIPage("")
     val requestedIncomplete =
       Option(request.getParameter("showIncomplete")).getOrElse("false").toBoolean
 
-    val allApps = parent.getApplicationList()
-      .filter(_.completed != requestedIncomplete)
-    val allAppsSize = allApps.size
-
+    val allAppsSize = parent.getApplicationList().count(_.completed != requestedIncomplete)
     val providerConfig = parent.getProviderConfig()
     val content =
       <div>

http://git-wip-us.apache.org/repos/asf/spark/blob/8e8de007/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala b/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala
index 735aa43..087c69e 100644
--- a/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala
@@ -174,12 +174,12 @@ class HistoryServer(
    *
    * @return List of all known applications.
    */
-  def getApplicationList(): Iterable[ApplicationHistoryInfo] = {
+  def getApplicationList(): Iterator[ApplicationHistoryInfo] = {
     provider.getListing()
   }
 
   def getApplicationInfoList: Iterator[ApplicationInfo] = {
-    getApplicationList().iterator.map(ApplicationsListResource.appHistoryInfoToPublicAppInfo)
+    getApplicationList().map(ApplicationsListResource.appHistoryInfoToPublicAppInfo)
   }
 
   def getApplicationInfo(appId: String): Option[ApplicationInfo] = {

http://git-wip-us.apache.org/repos/asf/spark/blob/8e8de007/core/src/main/scala/org/apache/spark/status/api/v1/ApplicationListResource.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/status/api/v1/ApplicationListResource.scala
b/core/src/main/scala/org/apache/spark/status/api/v1/ApplicationListResource.scala
index 075b9ba..7677929 100644
--- a/core/src/main/scala/org/apache/spark/status/api/v1/ApplicationListResource.scala
+++ b/core/src/main/scala/org/apache/spark/status/api/v1/ApplicationListResource.scala
@@ -16,7 +16,7 @@
  */
 package org.apache.spark.status.api.v1
 
-import java.util.{Arrays, Date, List => JList}
+import java.util.{Date, List => JList}
 import javax.ws.rs.{DefaultValue, GET, Produces, QueryParam}
 import javax.ws.rs.core.MediaType
 
@@ -32,33 +32,21 @@ private[v1] class ApplicationListResource(uiRoot: UIRoot) {
       @DefaultValue("3000-01-01") @QueryParam("maxDate") maxDate: SimpleDateParam,
       @QueryParam("limit") limit: Integer)
   : Iterator[ApplicationInfo] = {
-    val allApps = uiRoot.getApplicationInfoList
-    val adjStatus = {
-      if (status.isEmpty) {
-        Arrays.asList(ApplicationStatus.values(): _*)
-      } else {
-        status
-      }
-    }
-    val includeCompleted = adjStatus.contains(ApplicationStatus.COMPLETED)
-    val includeRunning = adjStatus.contains(ApplicationStatus.RUNNING)
-    val appList = allApps.filter { app =>
+
+    val numApps = Option(limit).map(_.toInt).getOrElse(Integer.MAX_VALUE)
+    val includeCompleted = status.isEmpty || status.contains(ApplicationStatus.COMPLETED)
+    val includeRunning = status.isEmpty || status.contains(ApplicationStatus.RUNNING)
+
+    uiRoot.getApplicationInfoList.filter { app =>
       val anyRunning = app.attempts.exists(!_.completed)
-      // if any attempt is still running, we consider the app to also still be running
-      val statusOk = (!anyRunning && includeCompleted) ||
-        (anyRunning && includeRunning)
+      // if any attempt is still running, we consider the app to also still be running;
       // keep the app if *any* attempts fall in the right time window
-      val dateOk = app.attempts.exists { attempt =>
-        attempt.startTime.getTime >= minDate.timestamp &&
-          attempt.startTime.getTime <= maxDate.timestamp
+      ((!anyRunning && includeCompleted) || (anyRunning && includeRunning))
&&
+      app.attempts.exists { attempt =>
+        val start = attempt.startTime.getTime
+        start >= minDate.timestamp && start <= maxDate.timestamp
       }
-      statusOk && dateOk
-    }
-    if (limit != null) {
-      appList.take(limit)
-    } else {
-      appList
-    }
+    }.take(numApps)
   }
 }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/8e8de007/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerSuite.scala
b/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerSuite.scala
index ae3f5d9..5b316b2 100644
--- a/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerSuite.scala
@@ -447,7 +447,7 @@ class HistoryServerSuite extends SparkFunSuite with BeforeAndAfter with
Matchers
       assert(4 === getNumJobsRestful(), s"two jobs back-to-back not updated, server=$server\n")
     }
     val jobcount = getNumJobs("/jobs")
-    assert(!provider.getListing().head.completed)
+    assert(!provider.getListing().next.completed)
 
     listApplications(false) should contain(appId)
 
@@ -455,7 +455,7 @@ class HistoryServerSuite extends SparkFunSuite with BeforeAndAfter with
Matchers
     resetSparkContext()
     // check the app is now found as completed
     eventually(stdTimeout, stdInterval) {
-      assert(provider.getListing().head.completed,
+      assert(provider.getListing().next.completed,
         s"application never completed, server=$server\n")
     }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/8e8de007/project/MimaExcludes.scala
----------------------------------------------------------------------
diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala
index 7362041..163e3f2 100644
--- a/project/MimaExcludes.scala
+++ b/project/MimaExcludes.scala
@@ -37,6 +37,8 @@ object MimaExcludes {
   // Exclude rules for 2.1.x
   lazy val v21excludes = v20excludes ++ {
     Seq(
+      // [SPARK-17671] Spark 2.0 history server summary page is slow even set spark.history.ui.maxApplications
+      ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.spark.deploy.history.HistoryServer.getApplicationList"),
       // [SPARK-14743] Improve delegation token handling in secure cluster
       ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.deploy.SparkHadoopUtil.getTimeFromNowToRenewal"),
       // [SPARK-16199][SQL] Add a method to list the referenced columns in data source Filter


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message