spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From iras...@apache.org
Subject spark git commit: [SPARK-20649][CORE] Simplify REST API resource structure.
Date Wed, 15 Nov 2017 21:42:02 GMT
Repository: spark
Updated Branches:
  refs/heads/master bc0848b4c -> 39b3f10dd


[SPARK-20649][CORE] Simplify REST API resource structure.

With the new UI store, the API resource classes have a lot less code,
since there's no need for complicated translations between the UI
types and the API types. So the code ended up with a bunch of files
with a single method declared in them.

This change re-structures the API code so that it uses less classes;
mainly, most sub-resources were removed, and the code to deal with
single-attempt and multi-attempt apps was simplified.

The only change was the addition of a method to return a single
attempt's information; that was missing in the old API, so trying
to retrieve "/v1/applications/appId/attemptId" would result in a
404 even if the attempt existed (and URIs under that one would
return valid data).

The streaming API resources also overtook the same treatment, even
though the data is not stored in the new UI store.

Author: Marcelo Vanzin <vanzin@cloudera.com>

Closes #19748 from vanzin/SPARK-20649.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/39b3f10d
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/39b3f10d
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/39b3f10d

Branch: refs/heads/master
Commit: 39b3f10dda73f4a1f735f17467e5c6c45c44e977
Parents: bc0848b
Author: Marcelo Vanzin <vanzin@cloudera.com>
Authored: Wed Nov 15 15:41:53 2017 -0600
Committer: Imran Rashid <irashid@cloudera.com>
Committed: Wed Nov 15 15:41:53 2017 -0600

----------------------------------------------------------------------
 .../status/api/v1/AllExecutorListResource.scala |  30 ---
 .../spark/status/api/v1/AllJobsResource.scala   |  35 ----
 .../spark/status/api/v1/AllRDDResource.scala    |  31 ---
 .../spark/status/api/v1/AllStagesResource.scala |  33 ---
 .../spark/status/api/v1/ApiRootResource.scala   | 203 ++-----------------
 .../api/v1/ApplicationEnvironmentResource.scala |  32 ---
 .../status/api/v1/ApplicationListResource.scala |   2 +-
 .../api/v1/EventLogDownloadResource.scala       |  71 -------
 .../status/api/v1/ExecutorListResource.scala    |  30 ---
 .../status/api/v1/OneApplicationResource.scala  | 146 ++++++++++++-
 .../spark/status/api/v1/OneJobResource.scala    |  38 ----
 .../spark/status/api/v1/OneRDDResource.scala    |  38 ----
 .../spark/status/api/v1/OneStageResource.scala  |  89 --------
 .../spark/status/api/v1/StagesResource.scala    |  97 +++++++++
 .../spark/status/api/v1/VersionResource.scala   |  30 ---
 .../api/v1/streaming/AllBatchesResource.scala   |  78 -------
 .../streaming/AllOutputOperationsResource.scala |  66 ------
 .../api/v1/streaming/AllReceiversResource.scala |  76 -------
 .../api/v1/streaming/ApiStreamingApp.scala      |  31 ++-
 .../v1/streaming/ApiStreamingRootResource.scala | 172 +++++++++++++---
 .../api/v1/streaming/OneBatchResource.scala     |  35 ----
 .../streaming/OneOutputOperationResource.scala  |  39 ----
 .../api/v1/streaming/OneReceiverResource.scala  |  35 ----
 .../streaming/StreamingStatisticsResource.scala |  64 ------
 24 files changed, 425 insertions(+), 1076 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/39b3f10d/core/src/main/scala/org/apache/spark/status/api/v1/AllExecutorListResource.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/status/api/v1/AllExecutorListResource.scala b/core/src/main/scala/org/apache/spark/status/api/v1/AllExecutorListResource.scala
deleted file mode 100644
index 5522f4c..0000000
--- a/core/src/main/scala/org/apache/spark/status/api/v1/AllExecutorListResource.scala
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.spark.status.api.v1
-
-import javax.ws.rs.{GET, Produces}
-import javax.ws.rs.core.MediaType
-
-import org.apache.spark.ui.SparkUI
-
-@Produces(Array(MediaType.APPLICATION_JSON))
-private[v1] class AllExecutorListResource(ui: SparkUI) {
-
-  @GET
-  def executorList(): Seq[ExecutorSummary] = ui.store.executorList(false)
-
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/39b3f10d/core/src/main/scala/org/apache/spark/status/api/v1/AllJobsResource.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/status/api/v1/AllJobsResource.scala b/core/src/main/scala/org/apache/spark/status/api/v1/AllJobsResource.scala
deleted file mode 100644
index b4fa3e6..0000000
--- a/core/src/main/scala/org/apache/spark/status/api/v1/AllJobsResource.scala
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.spark.status.api.v1
-
-import java.util.{Arrays, Date, List => JList}
-import javax.ws.rs._
-import javax.ws.rs.core.MediaType
-
-import org.apache.spark.JobExecutionStatus
-import org.apache.spark.ui.SparkUI
-import org.apache.spark.ui.jobs.UIData.JobUIData
-
-@Produces(Array(MediaType.APPLICATION_JSON))
-private[v1] class AllJobsResource(ui: SparkUI) {
-
-  @GET
-  def jobsList(@QueryParam("status") statuses: JList[JobExecutionStatus]): Seq[JobData] = {
-    ui.store.jobsList(statuses)
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/39b3f10d/core/src/main/scala/org/apache/spark/status/api/v1/AllRDDResource.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/status/api/v1/AllRDDResource.scala b/core/src/main/scala/org/apache/spark/status/api/v1/AllRDDResource.scala
deleted file mode 100644
index 2189e1d..0000000
--- a/core/src/main/scala/org/apache/spark/status/api/v1/AllRDDResource.scala
+++ /dev/null
@@ -1,31 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.spark.status.api.v1
-
-import javax.ws.rs.{GET, Produces}
-import javax.ws.rs.core.MediaType
-
-import org.apache.spark.storage.{RDDInfo, StorageStatus, StorageUtils}
-import org.apache.spark.ui.SparkUI
-
-@Produces(Array(MediaType.APPLICATION_JSON))
-private[v1] class AllRDDResource(ui: SparkUI) {
-
-  @GET
-  def rddList(): Seq[RDDStorageInfo] = ui.store.rddList()
-
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/39b3f10d/core/src/main/scala/org/apache/spark/status/api/v1/AllStagesResource.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/status/api/v1/AllStagesResource.scala b/core/src/main/scala/org/apache/spark/status/api/v1/AllStagesResource.scala
deleted file mode 100644
index e1c91cb..0000000
--- a/core/src/main/scala/org/apache/spark/status/api/v1/AllStagesResource.scala
+++ /dev/null
@@ -1,33 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.spark.status.api.v1
-
-import java.util.{List => JList}
-import javax.ws.rs.{GET, Produces, QueryParam}
-import javax.ws.rs.core.MediaType
-
-import org.apache.spark.ui.SparkUI
-
-@Produces(Array(MediaType.APPLICATION_JSON))
-private[v1] class AllStagesResource(ui: SparkUI) {
-
-  @GET
-  def stageList(@QueryParam("status") statuses: JList[StageStatus]): Seq[StageData] = {
-    ui.store.stageList(statuses)
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/39b3f10d/core/src/main/scala/org/apache/spark/status/api/v1/ApiRootResource.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/status/api/v1/ApiRootResource.scala b/core/src/main/scala/org/apache/spark/status/api/v1/ApiRootResource.scala
index 9d38330..ed9bdc6 100644
--- a/core/src/main/scala/org/apache/spark/status/api/v1/ApiRootResource.scala
+++ b/core/src/main/scala/org/apache/spark/status/api/v1/ApiRootResource.scala
@@ -44,189 +44,14 @@ import org.apache.spark.ui.SparkUI
 private[v1] class ApiRootResource extends ApiRequestContext {
 
   @Path("applications")
-  def getApplicationList(): ApplicationListResource = {
-    new ApplicationListResource(uiRoot)
-  }
+  def applicationList(): Class[ApplicationListResource] = classOf[ApplicationListResource]
 
   @Path("applications/{appId}")
-  def getApplication(): OneApplicationResource = {
-    new OneApplicationResource(uiRoot)
-  }
-
-  @Path("applications/{appId}/{attemptId}/jobs")
-  def getJobs(
-      @PathParam("appId") appId: String,
-      @PathParam("attemptId") attemptId: String): AllJobsResource = {
-    withSparkUI(appId, Some(attemptId)) { ui =>
-      new AllJobsResource(ui)
-    }
-  }
-
-  @Path("applications/{appId}/jobs")
-  def getJobs(@PathParam("appId") appId: String): AllJobsResource = {
-    withSparkUI(appId, None) { ui =>
-      new AllJobsResource(ui)
-    }
-  }
-
-  @Path("applications/{appId}/jobs/{jobId: \\d+}")
-  def getJob(@PathParam("appId") appId: String): OneJobResource = {
-    withSparkUI(appId, None) { ui =>
-      new OneJobResource(ui)
-    }
-  }
-
-  @Path("applications/{appId}/{attemptId}/jobs/{jobId: \\d+}")
-  def getJob(
-      @PathParam("appId") appId: String,
-      @PathParam("attemptId") attemptId: String): OneJobResource = {
-    withSparkUI(appId, Some(attemptId)) { ui =>
-      new OneJobResource(ui)
-    }
-  }
-
-  @Path("applications/{appId}/executors")
-  def getExecutors(@PathParam("appId") appId: String): ExecutorListResource = {
-    withSparkUI(appId, None) { ui =>
-      new ExecutorListResource(ui)
-    }
-  }
-
-  @Path("applications/{appId}/allexecutors")
-  def getAllExecutors(@PathParam("appId") appId: String): AllExecutorListResource = {
-    withSparkUI(appId, None) { ui =>
-      new AllExecutorListResource(ui)
-    }
-  }
-
-  @Path("applications/{appId}/{attemptId}/executors")
-  def getExecutors(
-      @PathParam("appId") appId: String,
-      @PathParam("attemptId") attemptId: String): ExecutorListResource = {
-    withSparkUI(appId, Some(attemptId)) { ui =>
-      new ExecutorListResource(ui)
-    }
-  }
-
-  @Path("applications/{appId}/{attemptId}/allexecutors")
-  def getAllExecutors(
-      @PathParam("appId") appId: String,
-      @PathParam("attemptId") attemptId: String): AllExecutorListResource = {
-    withSparkUI(appId, Some(attemptId)) { ui =>
-      new AllExecutorListResource(ui)
-    }
-  }
-
-  @Path("applications/{appId}/stages")
-  def getStages(@PathParam("appId") appId: String): AllStagesResource = {
-    withSparkUI(appId, None) { ui =>
-      new AllStagesResource(ui)
-    }
-  }
-
-  @Path("applications/{appId}/{attemptId}/stages")
-  def getStages(
-      @PathParam("appId") appId: String,
-      @PathParam("attemptId") attemptId: String): AllStagesResource = {
-    withSparkUI(appId, Some(attemptId)) { ui =>
-      new AllStagesResource(ui)
-    }
-  }
-
-  @Path("applications/{appId}/stages/{stageId: \\d+}")
-  def getStage(@PathParam("appId") appId: String): OneStageResource = {
-    withSparkUI(appId, None) { ui =>
-      new OneStageResource(ui)
-    }
-  }
-
-  @Path("applications/{appId}/{attemptId}/stages/{stageId: \\d+}")
-  def getStage(
-      @PathParam("appId") appId: String,
-      @PathParam("attemptId") attemptId: String): OneStageResource = {
-    withSparkUI(appId, Some(attemptId)) { ui =>
-      new OneStageResource(ui)
-    }
-  }
-
-  @Path("applications/{appId}/storage/rdd")
-  def getRdds(@PathParam("appId") appId: String): AllRDDResource = {
-    withSparkUI(appId, None) { ui =>
-      new AllRDDResource(ui)
-    }
-  }
-
-  @Path("applications/{appId}/{attemptId}/storage/rdd")
-  def getRdds(
-      @PathParam("appId") appId: String,
-      @PathParam("attemptId") attemptId: String): AllRDDResource = {
-    withSparkUI(appId, Some(attemptId)) { ui =>
-      new AllRDDResource(ui)
-    }
-  }
-
-  @Path("applications/{appId}/storage/rdd/{rddId: \\d+}")
-  def getRdd(@PathParam("appId") appId: String): OneRDDResource = {
-    withSparkUI(appId, None) { ui =>
-      new OneRDDResource(ui)
-    }
-  }
-
-  @Path("applications/{appId}/{attemptId}/storage/rdd/{rddId: \\d+}")
-  def getRdd(
-      @PathParam("appId") appId: String,
-      @PathParam("attemptId") attemptId: String): OneRDDResource = {
-    withSparkUI(appId, Some(attemptId)) { ui =>
-      new OneRDDResource(ui)
-    }
-  }
-
-  @Path("applications/{appId}/logs")
-  def getEventLogs(
-      @PathParam("appId") appId: String): EventLogDownloadResource = {
-    try {
-      // withSparkUI will throw NotFoundException if attemptId exists for this application.
-      // So we need to try again with attempt id "1".
-      withSparkUI(appId, None) { _ =>
-        new EventLogDownloadResource(uiRoot, appId, None)
-      }
-    } catch {
-      case _: NotFoundException =>
-        withSparkUI(appId, Some("1")) { _ =>
-          new EventLogDownloadResource(uiRoot, appId, None)
-        }
-    }
-  }
-
-  @Path("applications/{appId}/{attemptId}/logs")
-  def getEventLogs(
-      @PathParam("appId") appId: String,
-      @PathParam("attemptId") attemptId: String): EventLogDownloadResource = {
-    withSparkUI(appId, Some(attemptId)) { _ =>
-      new EventLogDownloadResource(uiRoot, appId, Some(attemptId))
-    }
-  }
+  def application(): Class[OneApplicationResource] = classOf[OneApplicationResource]
 
   @Path("version")
-  def getVersion(): VersionResource = {
-    new VersionResource(uiRoot)
-  }
-
-  @Path("applications/{appId}/environment")
-  def getEnvironment(@PathParam("appId") appId: String): ApplicationEnvironmentResource = {
-    withSparkUI(appId, None) { ui =>
-      new ApplicationEnvironmentResource(ui)
-    }
-  }
+  def version(): VersionInfo = new VersionInfo(org.apache.spark.SPARK_VERSION)
 
-  @Path("applications/{appId}/{attemptId}/environment")
-  def getEnvironment(
-      @PathParam("appId") appId: String,
-      @PathParam("attemptId") attemptId: String): ApplicationEnvironmentResource = {
-    withSparkUI(appId, Some(attemptId)) { ui =>
-      new ApplicationEnvironmentResource(ui)
-    }
-  }
 }
 
 private[spark] object ApiRootResource {
@@ -293,23 +118,29 @@ private[v1] trait ApiRequestContext {
 
   def uiRoot: UIRoot = UIRootFromServletContext.getUiRoot(servletContext)
 
+}
 
-  /**
-   * Get the spark UI with the given appID, and apply a function
-   * to it.  If there is no such app, throw an appropriate exception
-   */
-  def withSparkUI[T](appId: String, attemptId: Option[String])(f: SparkUI => T): T = {
+/**
+ * Base class for resource handlers that use app-specific data. Abstracts away dealing with
+ * application and attempt IDs, and finding the app's UI.
+ */
+private[v1] trait BaseAppResource extends ApiRequestContext {
+
+  @PathParam("appId") protected[this] var appId: String = _
+  @PathParam("attemptId") protected[this] var attemptId: String = _
+
+  protected def withUI[T](fn: SparkUI => T): T = {
     try {
-      uiRoot.withSparkUI(appId, attemptId) { ui =>
+      uiRoot.withSparkUI(appId, Option(attemptId)) { ui =>
         val user = httpRequest.getRemoteUser()
         if (!ui.securityManager.checkUIViewPermissions(user)) {
           throw new ForbiddenException(raw"""user "$user" is not authorized""")
         }
-        f(ui)
+        fn(ui)
       }
     } catch {
       case _: NoSuchElementException =>
-        val appKey = attemptId.map(appId + "/" + _).getOrElse(appId)
+        val appKey = Option(attemptId).map(appId + "/" + _).getOrElse(appId)
         throw new NotFoundException(s"no such app: $appKey")
     }
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/39b3f10d/core/src/main/scala/org/apache/spark/status/api/v1/ApplicationEnvironmentResource.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/status/api/v1/ApplicationEnvironmentResource.scala b/core/src/main/scala/org/apache/spark/status/api/v1/ApplicationEnvironmentResource.scala
deleted file mode 100644
index e702f8a..0000000
--- a/core/src/main/scala/org/apache/spark/status/api/v1/ApplicationEnvironmentResource.scala
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.spark.status.api.v1
-
-import javax.ws.rs._
-import javax.ws.rs.core.MediaType
-
-import org.apache.spark.ui.SparkUI
-
-@Produces(Array(MediaType.APPLICATION_JSON))
-private[v1] class ApplicationEnvironmentResource(ui: SparkUI) {
-
-  @GET
-  def getEnvironmentInfo(): ApplicationEnvironmentInfo = {
-    ui.store.environmentInfo()
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/39b3f10d/core/src/main/scala/org/apache/spark/status/api/v1/ApplicationListResource.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/status/api/v1/ApplicationListResource.scala b/core/src/main/scala/org/apache/spark/status/api/v1/ApplicationListResource.scala
index f039744..91660a5 100644
--- a/core/src/main/scala/org/apache/spark/status/api/v1/ApplicationListResource.scala
+++ b/core/src/main/scala/org/apache/spark/status/api/v1/ApplicationListResource.scala
@@ -23,7 +23,7 @@ import javax.ws.rs.core.MediaType
 import org.apache.spark.deploy.history.ApplicationHistoryInfo
 
 @Produces(Array(MediaType.APPLICATION_JSON))
-private[v1] class ApplicationListResource(uiRoot: UIRoot) {
+private[v1] class ApplicationListResource extends ApiRequestContext {
 
   @GET
   def appList(

http://git-wip-us.apache.org/repos/asf/spark/blob/39b3f10d/core/src/main/scala/org/apache/spark/status/api/v1/EventLogDownloadResource.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/status/api/v1/EventLogDownloadResource.scala b/core/src/main/scala/org/apache/spark/status/api/v1/EventLogDownloadResource.scala
deleted file mode 100644
index c84022d..0000000
--- a/core/src/main/scala/org/apache/spark/status/api/v1/EventLogDownloadResource.scala
+++ /dev/null
@@ -1,71 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.spark.status.api.v1
-
-import java.io.OutputStream
-import java.util.zip.ZipOutputStream
-import javax.ws.rs.{GET, Produces}
-import javax.ws.rs.core.{MediaType, Response, StreamingOutput}
-
-import scala.util.control.NonFatal
-
-import org.apache.spark.SparkConf
-import org.apache.spark.deploy.SparkHadoopUtil
-import org.apache.spark.internal.Logging
-
-@Produces(Array(MediaType.APPLICATION_OCTET_STREAM))
-private[v1] class EventLogDownloadResource(
-    val uIRoot: UIRoot,
-    val appId: String,
-    val attemptId: Option[String]) extends Logging {
-  val conf = SparkHadoopUtil.get.newConfiguration(new SparkConf)
-
-  @GET
-  def getEventLogs(): Response = {
-    try {
-      val fileName = {
-        attemptId match {
-          case Some(id) => s"eventLogs-$appId-$id.zip"
-          case None => s"eventLogs-$appId.zip"
-        }
-      }
-
-      val stream = new StreamingOutput {
-        override def write(output: OutputStream): Unit = {
-          val zipStream = new ZipOutputStream(output)
-          try {
-            uIRoot.writeEventLogs(appId, attemptId, zipStream)
-          } finally {
-            zipStream.close()
-          }
-
-        }
-      }
-
-      Response.ok(stream)
-        .header("Content-Disposition", s"attachment; filename=$fileName")
-        .header("Content-Type", MediaType.APPLICATION_OCTET_STREAM)
-        .build()
-    } catch {
-      case NonFatal(e) =>
-        Response.serverError()
-          .entity(s"Event logs are not available for app: $appId.")
-          .status(Response.Status.SERVICE_UNAVAILABLE)
-          .build()
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/39b3f10d/core/src/main/scala/org/apache/spark/status/api/v1/ExecutorListResource.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/status/api/v1/ExecutorListResource.scala b/core/src/main/scala/org/apache/spark/status/api/v1/ExecutorListResource.scala
deleted file mode 100644
index 975101c..0000000
--- a/core/src/main/scala/org/apache/spark/status/api/v1/ExecutorListResource.scala
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.spark.status.api.v1
-
-import javax.ws.rs.{GET, Produces}
-import javax.ws.rs.core.MediaType
-
-import org.apache.spark.ui.SparkUI
-
-@Produces(Array(MediaType.APPLICATION_JSON))
-private[v1] class ExecutorListResource(ui: SparkUI) {
-
-  @GET
-  def executorList(): Seq[ExecutorSummary] = ui.store.executorList(true)
-
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/39b3f10d/core/src/main/scala/org/apache/spark/status/api/v1/OneApplicationResource.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/status/api/v1/OneApplicationResource.scala b/core/src/main/scala/org/apache/spark/status/api/v1/OneApplicationResource.scala
index 18c3e2f..bd4df07 100644
--- a/core/src/main/scala/org/apache/spark/status/api/v1/OneApplicationResource.scala
+++ b/core/src/main/scala/org/apache/spark/status/api/v1/OneApplicationResource.scala
@@ -16,16 +16,150 @@
  */
 package org.apache.spark.status.api.v1
 
-import javax.ws.rs.{GET, PathParam, Produces}
-import javax.ws.rs.core.MediaType
+import java.io.OutputStream
+import java.util.{List => JList}
+import java.util.zip.ZipOutputStream
+import javax.ws.rs.{GET, Path, PathParam, Produces, QueryParam}
+import javax.ws.rs.core.{MediaType, Response, StreamingOutput}
+
+import scala.util.control.NonFatal
+
+import org.apache.spark.JobExecutionStatus
+import org.apache.spark.ui.SparkUI
 
 @Produces(Array(MediaType.APPLICATION_JSON))
-private[v1] class OneApplicationResource(uiRoot: UIRoot) {
+private[v1] class AbstractApplicationResource extends BaseAppResource {
+
+  @GET
+  @Path("jobs")
+  def jobsList(@QueryParam("status") statuses: JList[JobExecutionStatus]): Seq[JobData] = {
+    withUI(_.store.jobsList(statuses))
+  }
+
+  @GET
+  @Path("jobs/{jobId: \\d+}")
+  def oneJob(@PathParam("jobId") jobId: Int): JobData = withUI { ui =>
+    try {
+      ui.store.job(jobId)
+    } catch {
+      case _: NoSuchElementException =>
+        throw new NotFoundException("unknown job: " + jobId)
+    }
+  }
+
+  @GET
+  @Path("executors")
+  def executorList(): Seq[ExecutorSummary] = withUI(_.store.executorList(true))
+
+  @GET
+  @Path("allexecutors")
+  def allExecutorList(): Seq[ExecutorSummary] = withUI(_.store.executorList(false))
+
+  @Path("stages")
+  def stages(): Class[StagesResource] = classOf[StagesResource]
+
+  @GET
+  @Path("storage/rdd")
+  def rddList(): Seq[RDDStorageInfo] = withUI(_.store.rddList())
+
+  @GET
+  @Path("storage/rdd/{rddId: \\d+}")
+  def rddData(@PathParam("rddId") rddId: Int): RDDStorageInfo = withUI { ui =>
+    try {
+      ui.store.rdd(rddId)
+    } catch {
+      case _: NoSuchElementException =>
+        throw new NotFoundException(s"no rdd found w/ id $rddId")
+    }
+  }
+
+  @GET
+  @Path("environment")
+  def environmentInfo(): ApplicationEnvironmentInfo = withUI(_.store.environmentInfo())
+
+  @GET
+  @Path("logs")
+  @Produces(Array(MediaType.APPLICATION_OCTET_STREAM))
+  def getEventLogs(): Response = {
+    // Retrieve the UI for the application just to do access permission checks. For backwards
+    // compatibility, this code also tries with attemptId "1" if the UI without an attempt ID does
+    // not exist.
+    try {
+      withUI { _ => }
+    } catch {
+      case _: NotFoundException if attemptId == null =>
+        attemptId = "1"
+        withUI { _ => }
+        attemptId = null
+    }
+
+    try {
+      val fileName = if (attemptId != null) {
+        s"eventLogs-$appId-$attemptId.zip"
+      } else {
+        s"eventLogs-$appId.zip"
+      }
+
+      val stream = new StreamingOutput {
+        override def write(output: OutputStream): Unit = {
+          val zipStream = new ZipOutputStream(output)
+          try {
+            uiRoot.writeEventLogs(appId, Option(attemptId), zipStream)
+          } finally {
+            zipStream.close()
+          }
+
+        }
+      }
+
+      Response.ok(stream)
+        .header("Content-Disposition", s"attachment; filename=$fileName")
+        .header("Content-Type", MediaType.APPLICATION_OCTET_STREAM)
+        .build()
+    } catch {
+      case NonFatal(e) =>
+        Response.serverError()
+          .entity(s"Event logs are not available for app: $appId.")
+          .status(Response.Status.SERVICE_UNAVAILABLE)
+          .build()
+    }
+  }
+
+  /**
+   * This method needs to be last, otherwise it clashes with the paths for the above methods
+   * and causes JAX-RS to not find things.
+   */
+  @Path("{attemptId}")
+  def applicationAttempt(): Class[OneApplicationAttemptResource] = {
+    if (attemptId != null) {
+      throw new NotFoundException(httpRequest.getRequestURI())
+    }
+    classOf[OneApplicationAttemptResource]
+  }
+
+}
+
+private[v1] class OneApplicationResource extends AbstractApplicationResource {
+
+  @GET
+  def getApp(): ApplicationInfo = {
+    val app = uiRoot.getApplicationInfo(appId)
+    app.getOrElse(throw new NotFoundException("unknown app: " + appId))
+  }
+
+}
+
+private[v1] class OneApplicationAttemptResource extends AbstractApplicationResource {
 
   @GET
-  def getApp(@PathParam("appId") appId: String): ApplicationInfo = {
-    val apps = uiRoot.getApplicationInfo(appId)
-    apps.getOrElse(throw new NotFoundException("unknown app: " + appId))
+  def getAttempt(): ApplicationAttemptInfo = {
+    uiRoot.getApplicationInfo(appId)
+      .flatMap { app =>
+        app.attempts.filter(_.attemptId == attemptId).headOption
+      }
+      .getOrElse {
+        throw new NotFoundException(s"unknown app $appId, attempt $attemptId")
+      }
   }
 
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/39b3f10d/core/src/main/scala/org/apache/spark/status/api/v1/OneJobResource.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/status/api/v1/OneJobResource.scala b/core/src/main/scala/org/apache/spark/status/api/v1/OneJobResource.scala
deleted file mode 100644
index 3ee884e..0000000
--- a/core/src/main/scala/org/apache/spark/status/api/v1/OneJobResource.scala
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.spark.status.api.v1
-
-import java.util.NoSuchElementException
-import javax.ws.rs.{GET, PathParam, Produces}
-import javax.ws.rs.core.MediaType
-
-import org.apache.spark.ui.SparkUI
-
-@Produces(Array(MediaType.APPLICATION_JSON))
-private[v1] class OneJobResource(ui: SparkUI) {
-
-  @GET
-  def oneJob(@PathParam("jobId") jobId: Int): JobData = {
-    try {
-      ui.store.job(jobId)
-    } catch {
-      case _: NoSuchElementException =>
-        throw new NotFoundException("unknown job: " + jobId)
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/39b3f10d/core/src/main/scala/org/apache/spark/status/api/v1/OneRDDResource.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/status/api/v1/OneRDDResource.scala b/core/src/main/scala/org/apache/spark/status/api/v1/OneRDDResource.scala
deleted file mode 100644
index ca9758c..0000000
--- a/core/src/main/scala/org/apache/spark/status/api/v1/OneRDDResource.scala
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.spark.status.api.v1
-
-import java.util.NoSuchElementException
-import javax.ws.rs.{GET, PathParam, Produces}
-import javax.ws.rs.core.MediaType
-
-import org.apache.spark.ui.SparkUI
-
-@Produces(Array(MediaType.APPLICATION_JSON))
-private[v1] class OneRDDResource(ui: SparkUI) {
-
-  @GET
-  def rddData(@PathParam("rddId") rddId: Int): RDDStorageInfo = {
-    try {
-      ui.store.rdd(rddId)
-    } catch {
-      case _: NoSuchElementException =>
-        throw new NotFoundException(s"no rdd found w/ id $rddId")
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/39b3f10d/core/src/main/scala/org/apache/spark/status/api/v1/OneStageResource.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/status/api/v1/OneStageResource.scala b/core/src/main/scala/org/apache/spark/status/api/v1/OneStageResource.scala
deleted file mode 100644
index 20dd73e..0000000
--- a/core/src/main/scala/org/apache/spark/status/api/v1/OneStageResource.scala
+++ /dev/null
@@ -1,89 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.spark.status.api.v1
-
-import javax.ws.rs._
-import javax.ws.rs.core.MediaType
-
-import org.apache.spark.SparkException
-import org.apache.spark.scheduler.StageInfo
-import org.apache.spark.status.api.v1.StageStatus._
-import org.apache.spark.status.api.v1.TaskSorting._
-import org.apache.spark.ui.SparkUI
-import org.apache.spark.ui.jobs.UIData.StageUIData
-
-@Produces(Array(MediaType.APPLICATION_JSON))
-private[v1] class OneStageResource(ui: SparkUI) {
-
-  @GET
-  @Path("")
-  def stageData(
-      @PathParam("stageId") stageId: Int,
-      @QueryParam("details") @DefaultValue("true") details: Boolean): Seq[StageData] = {
-    val ret = ui.store.stageData(stageId, details = details)
-    if (ret.nonEmpty) {
-      ret
-    } else {
-      throw new NotFoundException(s"unknown stage: $stageId")
-    }
-  }
-
-  @GET
-  @Path("/{stageAttemptId: \\d+}")
-  def oneAttemptData(
-      @PathParam("stageId") stageId: Int,
-      @PathParam("stageAttemptId") stageAttemptId: Int,
-      @QueryParam("details") @DefaultValue("true") details: Boolean): StageData = {
-    try {
-      ui.store.stageAttempt(stageId, stageAttemptId, details = details)
-    } catch {
-      case _: NoSuchElementException =>
-        throw new NotFoundException(s"unknown attempt $stageAttemptId for stage $stageId.")
-    }
-  }
-
-  @GET
-  @Path("/{stageAttemptId: \\d+}/taskSummary")
-  def taskSummary(
-      @PathParam("stageId") stageId: Int,
-      @PathParam("stageAttemptId") stageAttemptId: Int,
-      @DefaultValue("0.05,0.25,0.5,0.75,0.95") @QueryParam("quantiles") quantileString: String)
-  : TaskMetricDistributions = {
-    val quantiles = quantileString.split(",").map { s =>
-      try {
-        s.toDouble
-      } catch {
-        case nfe: NumberFormatException =>
-          throw new BadParameterException("quantiles", "double", s)
-      }
-    }
-
-    ui.store.taskSummary(stageId, stageAttemptId, quantiles)
-  }
-
-  @GET
-  @Path("/{stageAttemptId: \\d+}/taskList")
-  def taskList(
-      @PathParam("stageId") stageId: Int,
-      @PathParam("stageAttemptId") stageAttemptId: Int,
-      @DefaultValue("0") @QueryParam("offset") offset: Int,
-      @DefaultValue("20") @QueryParam("length") length: Int,
-      @DefaultValue("ID") @QueryParam("sortBy") sortBy: TaskSorting): Seq[TaskData] = {
-    ui.store.taskList(stageId, stageAttemptId, offset, length, sortBy)
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/39b3f10d/core/src/main/scala/org/apache/spark/status/api/v1/StagesResource.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/status/api/v1/StagesResource.scala b/core/src/main/scala/org/apache/spark/status/api/v1/StagesResource.scala
new file mode 100644
index 0000000..bd4dfe3
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/status/api/v1/StagesResource.scala
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.status.api.v1
+
+import java.util.{List => JList}
+import javax.ws.rs._
+import javax.ws.rs.core.MediaType
+
+import org.apache.spark.SparkException
+import org.apache.spark.scheduler.StageInfo
+import org.apache.spark.status.api.v1.StageStatus._
+import org.apache.spark.status.api.v1.TaskSorting._
+import org.apache.spark.ui.SparkUI
+import org.apache.spark.ui.jobs.UIData.StageUIData
+
+@Produces(Array(MediaType.APPLICATION_JSON))
+private[v1] class StagesResource extends BaseAppResource {
+
+  @GET
+  def stageList(@QueryParam("status") statuses: JList[StageStatus]): Seq[StageData] = {
+    withUI(_.store.stageList(statuses))
+  }
+
+  @GET
+  @Path("{stageId: \\d+}")
+  def stageData(
+      @PathParam("stageId") stageId: Int,
+      @QueryParam("details") @DefaultValue("true") details: Boolean): Seq[StageData] = {
+    withUI { ui =>
+      val ret = ui.store.stageData(stageId, details = details)
+      if (ret.nonEmpty) {
+        ret
+      } else {
+        throw new NotFoundException(s"unknown stage: $stageId")
+      }
+    }
+  }
+
+  @GET
+  @Path("{stageId: \\d+}/{stageAttemptId: \\d+}")
+  def oneAttemptData(
+      @PathParam("stageId") stageId: Int,
+      @PathParam("stageAttemptId") stageAttemptId: Int,
+      @QueryParam("details") @DefaultValue("true") details: Boolean): StageData = withUI { ui =>
+    try {
+      ui.store.stageAttempt(stageId, stageAttemptId, details = details)
+    } catch {
+      case _: NoSuchElementException =>
+        throw new NotFoundException(s"unknown attempt $stageAttemptId for stage $stageId.")
+    }
+  }
+
+  @GET
+  @Path("{stageId: \\d+}/{stageAttemptId: \\d+}/taskSummary")
+  def taskSummary(
+      @PathParam("stageId") stageId: Int,
+      @PathParam("stageAttemptId") stageAttemptId: Int,
+      @DefaultValue("0.05,0.25,0.5,0.75,0.95") @QueryParam("quantiles") quantileString: String)
+  : TaskMetricDistributions = withUI { ui =>
+    val quantiles = quantileString.split(",").map { s =>
+      try {
+        s.toDouble
+      } catch {
+        case nfe: NumberFormatException =>
+          throw new BadParameterException("quantiles", "double", s)
+      }
+    }
+
+    ui.store.taskSummary(stageId, stageAttemptId, quantiles)
+  }
+
+  @GET
+  @Path("{stageId: \\d+}/{stageAttemptId: \\d+}/taskList")
+  def taskList(
+      @PathParam("stageId") stageId: Int,
+      @PathParam("stageAttemptId") stageAttemptId: Int,
+      @DefaultValue("0") @QueryParam("offset") offset: Int,
+      @DefaultValue("20") @QueryParam("length") length: Int,
+      @DefaultValue("ID") @QueryParam("sortBy") sortBy: TaskSorting): Seq[TaskData] = {
+    withUI(_.store.taskList(stageId, stageAttemptId, offset, length, sortBy))
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/39b3f10d/core/src/main/scala/org/apache/spark/status/api/v1/VersionResource.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/status/api/v1/VersionResource.scala b/core/src/main/scala/org/apache/spark/status/api/v1/VersionResource.scala
deleted file mode 100644
index 673da1c..0000000
--- a/core/src/main/scala/org/apache/spark/status/api/v1/VersionResource.scala
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.spark.status.api.v1
-
-import javax.ws.rs._
-import javax.ws.rs.core.MediaType
-
-@Produces(Array(MediaType.APPLICATION_JSON))
-private[v1] class VersionResource(ui: UIRoot) {
-
-  @GET
-  def getVersionInfo(): VersionInfo = new VersionInfo(
-    org.apache.spark.SPARK_VERSION
-  )
-
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/39b3f10d/streaming/src/main/scala/org/apache/spark/status/api/v1/streaming/AllBatchesResource.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/status/api/v1/streaming/AllBatchesResource.scala b/streaming/src/main/scala/org/apache/spark/status/api/v1/streaming/AllBatchesResource.scala
deleted file mode 100644
index 3a51ae6..0000000
--- a/streaming/src/main/scala/org/apache/spark/status/api/v1/streaming/AllBatchesResource.scala
+++ /dev/null
@@ -1,78 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.status.api.v1.streaming
-
-import java.util.{ArrayList => JArrayList, Arrays => JArrays, Date, List => JList}
-import javax.ws.rs.{GET, Produces, QueryParam}
-import javax.ws.rs.core.MediaType
-
-import org.apache.spark.status.api.v1.streaming.AllBatchesResource._
-import org.apache.spark.streaming.ui.StreamingJobProgressListener
-
-@Produces(Array(MediaType.APPLICATION_JSON))
-private[v1] class AllBatchesResource(listener: StreamingJobProgressListener) {
-
-  @GET
-  def batchesList(@QueryParam("status") statusParams: JList[BatchStatus]): Seq[BatchInfo] = {
-    batchInfoList(listener, statusParams).sortBy(- _.batchId)
-  }
-}
-
-private[v1] object AllBatchesResource {
-
-  def batchInfoList(
-      listener: StreamingJobProgressListener,
-      statusParams: JList[BatchStatus] = new JArrayList[BatchStatus]()): Seq[BatchInfo] = {
-
-    listener.synchronized {
-      val statuses =
-        if (statusParams.isEmpty) JArrays.asList(BatchStatus.values(): _*) else statusParams
-      val statusToBatches = Seq(
-        BatchStatus.COMPLETED -> listener.retainedCompletedBatches,
-        BatchStatus.QUEUED -> listener.waitingBatches,
-        BatchStatus.PROCESSING -> listener.runningBatches
-      )
-
-      val batchInfos = for {
-        (status, batches) <- statusToBatches
-        batch <- batches if statuses.contains(status)
-      } yield {
-        val batchId = batch.batchTime.milliseconds
-        val firstFailureReason = batch.outputOperations.flatMap(_._2.failureReason).headOption
-
-        new BatchInfo(
-          batchId = batchId,
-          batchTime = new Date(batchId),
-          status = status.toString,
-          batchDuration = listener.batchDuration,
-          inputSize = batch.numRecords,
-          schedulingDelay = batch.schedulingDelay,
-          processingTime = batch.processingDelay,
-          totalDelay = batch.totalDelay,
-          numActiveOutputOps = batch.numActiveOutputOp,
-          numCompletedOutputOps = batch.numCompletedOutputOp,
-          numFailedOutputOps = batch.numFailedOutputOp,
-          numTotalOutputOps = batch.outputOperations.size,
-          firstFailureReason = firstFailureReason
-        )
-      }
-
-      batchInfos
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/39b3f10d/streaming/src/main/scala/org/apache/spark/status/api/v1/streaming/AllOutputOperationsResource.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/status/api/v1/streaming/AllOutputOperationsResource.scala b/streaming/src/main/scala/org/apache/spark/status/api/v1/streaming/AllOutputOperationsResource.scala
deleted file mode 100644
index 0eb649f..0000000
--- a/streaming/src/main/scala/org/apache/spark/status/api/v1/streaming/AllOutputOperationsResource.scala
+++ /dev/null
@@ -1,66 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.status.api.v1.streaming
-
-import java.util.Date
-import javax.ws.rs.{GET, PathParam, Produces}
-import javax.ws.rs.core.MediaType
-
-import org.apache.spark.status.api.v1.NotFoundException
-import org.apache.spark.status.api.v1.streaming.AllOutputOperationsResource._
-import org.apache.spark.streaming.Time
-import org.apache.spark.streaming.ui.StreamingJobProgressListener
-
-@Produces(Array(MediaType.APPLICATION_JSON))
-private[v1] class AllOutputOperationsResource(listener: StreamingJobProgressListener) {
-
-  @GET
-  def operationsList(@PathParam("batchId") batchId: Long): Seq[OutputOperationInfo] = {
-    outputOperationInfoList(listener, batchId).sortBy(_.outputOpId)
-  }
-}
-
-private[v1] object AllOutputOperationsResource {
-
-  def outputOperationInfoList(
-      listener: StreamingJobProgressListener,
-      batchId: Long): Seq[OutputOperationInfo] = {
-
-    listener.synchronized {
-      listener.getBatchUIData(Time(batchId)) match {
-        case Some(batch) =>
-          for ((opId, op) <- batch.outputOperations) yield {
-            val jobIds = batch.outputOpIdSparkJobIdPairs
-              .filter(_.outputOpId == opId).map(_.sparkJobId).toSeq.sorted
-
-            new OutputOperationInfo(
-              outputOpId = opId,
-              name = op.name,
-              description = op.description,
-              startTime = op.startTime.map(new Date(_)),
-              endTime = op.endTime.map(new Date(_)),
-              duration = op.duration,
-              failureReason = op.failureReason,
-              jobIds = jobIds
-            )
-          }
-        case None => throw new NotFoundException("unknown batch: " + batchId)
-      }
-    }.toSeq
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/39b3f10d/streaming/src/main/scala/org/apache/spark/status/api/v1/streaming/AllReceiversResource.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/status/api/v1/streaming/AllReceiversResource.scala b/streaming/src/main/scala/org/apache/spark/status/api/v1/streaming/AllReceiversResource.scala
deleted file mode 100644
index 5a276a9..0000000
--- a/streaming/src/main/scala/org/apache/spark/status/api/v1/streaming/AllReceiversResource.scala
+++ /dev/null
@@ -1,76 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.status.api.v1.streaming
-
-import java.util.Date
-import javax.ws.rs.{GET, Produces}
-import javax.ws.rs.core.MediaType
-
-import org.apache.spark.status.api.v1.streaming.AllReceiversResource._
-import org.apache.spark.streaming.ui.StreamingJobProgressListener
-
-@Produces(Array(MediaType.APPLICATION_JSON))
-private[v1] class AllReceiversResource(listener: StreamingJobProgressListener) {
-
-  @GET
-  def receiversList(): Seq[ReceiverInfo] = {
-    receiverInfoList(listener).sortBy(_.streamId)
-  }
-}
-
-private[v1] object AllReceiversResource {
-
-  def receiverInfoList(listener: StreamingJobProgressListener): Seq[ReceiverInfo] = {
-    listener.synchronized {
-      listener.receivedRecordRateWithBatchTime.map { case (streamId, eventRates) =>
-
-        val receiverInfo = listener.receiverInfo(streamId)
-        val streamName = receiverInfo.map(_.name)
-          .orElse(listener.streamName(streamId)).getOrElse(s"Stream-$streamId")
-        val avgEventRate =
-          if (eventRates.isEmpty) None else Some(eventRates.map(_._2).sum / eventRates.size)
-
-        val (errorTime, errorMessage, error) = receiverInfo match {
-          case None => (None, None, None)
-          case Some(info) =>
-            val someTime =
-              if (info.lastErrorTime >= 0) Some(new Date(info.lastErrorTime)) else None
-            val someMessage =
-              if (info.lastErrorMessage.length > 0) Some(info.lastErrorMessage) else None
-            val someError =
-              if (info.lastError.length > 0) Some(info.lastError) else None
-
-            (someTime, someMessage, someError)
-        }
-
-        new ReceiverInfo(
-          streamId = streamId,
-          streamName = streamName,
-          isActive = receiverInfo.map(_.active),
-          executorId = receiverInfo.map(_.executorId),
-          executorHost = receiverInfo.map(_.location),
-          lastErrorTime = errorTime,
-          lastErrorMessage = errorMessage,
-          lastError = error,
-          avgEventRate = avgEventRate,
-          eventRates = eventRates
-        )
-      }.toSeq
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/39b3f10d/streaming/src/main/scala/org/apache/spark/status/api/v1/streaming/ApiStreamingApp.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/status/api/v1/streaming/ApiStreamingApp.scala b/streaming/src/main/scala/org/apache/spark/status/api/v1/streaming/ApiStreamingApp.scala
index aea75d5..07d8164 100644
--- a/streaming/src/main/scala/org/apache/spark/status/api/v1/streaming/ApiStreamingApp.scala
+++ b/streaming/src/main/scala/org/apache/spark/status/api/v1/streaming/ApiStreamingApp.scala
@@ -19,24 +19,39 @@ package org.apache.spark.status.api.v1.streaming
 
 import javax.ws.rs.{Path, PathParam}
 
-import org.apache.spark.status.api.v1.ApiRequestContext
+import org.apache.spark.status.api.v1._
+import org.apache.spark.streaming.ui.StreamingJobProgressListener
 
 @Path("/v1")
 private[v1] class ApiStreamingApp extends ApiRequestContext {
 
   @Path("applications/{appId}/streaming")
-  def getStreamingRoot(@PathParam("appId") appId: String): ApiStreamingRootResource = {
-    withSparkUI(appId, None) { ui =>
-      new ApiStreamingRootResource(ui)
-    }
+  def getStreamingRoot(@PathParam("appId") appId: String): Class[ApiStreamingRootResource] = {
+    classOf[ApiStreamingRootResource]
   }
 
   @Path("applications/{appId}/{attemptId}/streaming")
   def getStreamingRoot(
       @PathParam("appId") appId: String,
-      @PathParam("attemptId") attemptId: String): ApiStreamingRootResource = {
-    withSparkUI(appId, Some(attemptId)) { ui =>
-      new ApiStreamingRootResource(ui)
+      @PathParam("attemptId") attemptId: String): Class[ApiStreamingRootResource] = {
+    classOf[ApiStreamingRootResource]
+  }
+}
+
+/**
+ * Base class for streaming API handlers, provides easy access to the streaming listener that
+ * holds the app's information.
+ */
+private[v1] trait BaseStreamingAppResource extends BaseAppResource {
+
+  protected def withListener[T](fn: StreamingJobProgressListener => T): T = withUI { ui =>
+    val listener = ui.getStreamingJobProgressListener match {
+      case Some(listener) => listener.asInstanceOf[StreamingJobProgressListener]
+      case None => throw new NotFoundException("no streaming listener attached to " + ui.getAppName)
+    }
+    listener.synchronized {
+      fn(listener)
     }
   }
+
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/39b3f10d/streaming/src/main/scala/org/apache/spark/status/api/v1/streaming/ApiStreamingRootResource.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/status/api/v1/streaming/ApiStreamingRootResource.scala b/streaming/src/main/scala/org/apache/spark/status/api/v1/streaming/ApiStreamingRootResource.scala
index 1ccd586..a2571b9 100644
--- a/streaming/src/main/scala/org/apache/spark/status/api/v1/streaming/ApiStreamingRootResource.scala
+++ b/streaming/src/main/scala/org/apache/spark/status/api/v1/streaming/ApiStreamingRootResource.scala
@@ -17,58 +17,180 @@
 
 package org.apache.spark.status.api.v1.streaming
 
-import javax.ws.rs.Path
+import java.util.{Arrays => JArrays, Collections, Date, List => JList}
+import javax.ws.rs.{GET, Path, PathParam, Produces, QueryParam}
+import javax.ws.rs.core.MediaType
 
 import org.apache.spark.status.api.v1.NotFoundException
+import org.apache.spark.streaming.Time
 import org.apache.spark.streaming.ui.StreamingJobProgressListener
+import org.apache.spark.streaming.ui.StreamingJobProgressListener._
 import org.apache.spark.ui.SparkUI
 
-private[v1] class ApiStreamingRootResource(ui: SparkUI) {
-
-  import org.apache.spark.status.api.v1.streaming.ApiStreamingRootResource._
+@Produces(Array(MediaType.APPLICATION_JSON))
+private[v1] class ApiStreamingRootResource extends BaseStreamingAppResource {
 
+  @GET
   @Path("statistics")
-  def getStreamingStatistics(): StreamingStatisticsResource = {
-    new StreamingStatisticsResource(getListener(ui))
+  def streamingStatistics(): StreamingStatistics = withListener { listener =>
+    val batches = listener.retainedBatches
+    val avgInputRate = avgRate(batches.map(_.numRecords * 1000.0 / listener.batchDuration))
+    val avgSchedulingDelay = avgTime(batches.flatMap(_.schedulingDelay))
+    val avgProcessingTime = avgTime(batches.flatMap(_.processingDelay))
+    val avgTotalDelay = avgTime(batches.flatMap(_.totalDelay))
+
+    new StreamingStatistics(
+      startTime = new Date(listener.startTime),
+      batchDuration = listener.batchDuration,
+      numReceivers = listener.numReceivers,
+      numActiveReceivers = listener.numActiveReceivers,
+      numInactiveReceivers = listener.numInactiveReceivers,
+      numTotalCompletedBatches = listener.numTotalCompletedBatches,
+      numRetainedCompletedBatches = listener.retainedCompletedBatches.size,
+      numActiveBatches = listener.numUnprocessedBatches,
+      numProcessedRecords = listener.numTotalProcessedRecords,
+      numReceivedRecords = listener.numTotalReceivedRecords,
+      avgInputRate = avgInputRate,
+      avgSchedulingDelay = avgSchedulingDelay,
+      avgProcessingTime = avgProcessingTime,
+      avgTotalDelay = avgTotalDelay
+    )
   }
 
+  @GET
   @Path("receivers")
-  def getReceivers(): AllReceiversResource = {
-    new AllReceiversResource(getListener(ui))
+  def receiversList(): Seq[ReceiverInfo] = withListener { listener =>
+    listener.receivedRecordRateWithBatchTime.map { case (streamId, eventRates) =>
+      val receiverInfo = listener.receiverInfo(streamId)
+      val streamName = receiverInfo.map(_.name)
+        .orElse(listener.streamName(streamId)).getOrElse(s"Stream-$streamId")
+      val avgEventRate =
+        if (eventRates.isEmpty) None else Some(eventRates.map(_._2).sum / eventRates.size)
+
+      val (errorTime, errorMessage, error) = receiverInfo match {
+        case None => (None, None, None)
+        case Some(info) =>
+          val someTime =
+            if (info.lastErrorTime >= 0) Some(new Date(info.lastErrorTime)) else None
+          val someMessage =
+            if (info.lastErrorMessage.length > 0) Some(info.lastErrorMessage) else None
+          val someError =
+            if (info.lastError.length > 0) Some(info.lastError) else None
+
+          (someTime, someMessage, someError)
+      }
+
+      new ReceiverInfo(
+        streamId = streamId,
+        streamName = streamName,
+        isActive = receiverInfo.map(_.active),
+        executorId = receiverInfo.map(_.executorId),
+        executorHost = receiverInfo.map(_.location),
+        lastErrorTime = errorTime,
+        lastErrorMessage = errorMessage,
+        lastError = error,
+        avgEventRate = avgEventRate,
+        eventRates = eventRates
+      )
+    }.toSeq.sortBy(_.streamId)
   }
 
+  @GET
   @Path("receivers/{streamId: \\d+}")
-  def getReceiver(): OneReceiverResource = {
-    new OneReceiverResource(getListener(ui))
+  def oneReceiver(@PathParam("streamId") streamId: Int): ReceiverInfo = {
+    receiversList().find { _.streamId == streamId }.getOrElse(
+      throw new NotFoundException("unknown receiver: " + streamId))
   }
 
+  @GET
   @Path("batches")
-  def getBatches(): AllBatchesResource = {
-    new AllBatchesResource(getListener(ui))
+  def batchesList(@QueryParam("status") statusParams: JList[BatchStatus]): Seq[BatchInfo] = {
+    withListener { listener =>
+      val statuses =
+        if (statusParams.isEmpty) JArrays.asList(BatchStatus.values(): _*) else statusParams
+      val statusToBatches = Seq(
+        BatchStatus.COMPLETED -> listener.retainedCompletedBatches,
+        BatchStatus.QUEUED -> listener.waitingBatches,
+        BatchStatus.PROCESSING -> listener.runningBatches
+      )
+
+      val batchInfos = for {
+        (status, batches) <- statusToBatches
+        batch <- batches if statuses.contains(status)
+      } yield {
+        val batchId = batch.batchTime.milliseconds
+        val firstFailureReason = batch.outputOperations.flatMap(_._2.failureReason).headOption
+
+        new BatchInfo(
+          batchId = batchId,
+          batchTime = new Date(batchId),
+          status = status.toString,
+          batchDuration = listener.batchDuration,
+          inputSize = batch.numRecords,
+          schedulingDelay = batch.schedulingDelay,
+          processingTime = batch.processingDelay,
+          totalDelay = batch.totalDelay,
+          numActiveOutputOps = batch.numActiveOutputOp,
+          numCompletedOutputOps = batch.numCompletedOutputOp,
+          numFailedOutputOps = batch.numFailedOutputOp,
+          numTotalOutputOps = batch.outputOperations.size,
+          firstFailureReason = firstFailureReason
+        )
+      }
+
+      batchInfos.sortBy(- _.batchId)
+    }
   }
 
+  @GET
   @Path("batches/{batchId: \\d+}")
-  def getBatch(): OneBatchResource = {
-    new OneBatchResource(getListener(ui))
+  def oneBatch(@PathParam("batchId") batchId: Long): BatchInfo = {
+    batchesList(Collections.emptyList()).find { _.batchId == batchId }.getOrElse(
+      throw new NotFoundException("unknown batch: " + batchId))
   }
 
+  @GET
   @Path("batches/{batchId: \\d+}/operations")
-  def getOutputOperations(): AllOutputOperationsResource = {
-    new AllOutputOperationsResource(getListener(ui))
+  def operationsList(@PathParam("batchId") batchId: Long): Seq[OutputOperationInfo] = {
+    withListener { listener =>
+      val ops = listener.getBatchUIData(Time(batchId)) match {
+        case Some(batch) =>
+          for ((opId, op) <- batch.outputOperations) yield {
+            val jobIds = batch.outputOpIdSparkJobIdPairs
+              .filter(_.outputOpId == opId).map(_.sparkJobId).toSeq.sorted
+
+            new OutputOperationInfo(
+              outputOpId = opId,
+              name = op.name,
+              description = op.description,
+              startTime = op.startTime.map(new Date(_)),
+              endTime = op.endTime.map(new Date(_)),
+              duration = op.duration,
+              failureReason = op.failureReason,
+              jobIds = jobIds
+            )
+          }
+        case None => throw new NotFoundException("unknown batch: " + batchId)
+      }
+      ops.toSeq
+    }
   }
 
+  @GET
   @Path("batches/{batchId: \\d+}/operations/{outputOpId: \\d+}")
-  def getOutputOperation(): OneOutputOperationResource = {
-    new OneOutputOperationResource(getListener(ui))
+  def oneOperation(
+      @PathParam("batchId") batchId: Long,
+      @PathParam("outputOpId") opId: OutputOpId): OutputOperationInfo = {
+    operationsList(batchId).find { _.outputOpId == opId }.getOrElse(
+      throw new NotFoundException("unknown output operation: " + opId))
   }
 
-}
+  private def avgRate(data: Seq[Double]): Option[Double] = {
+    if (data.isEmpty) None else Some(data.sum / data.size)
+  }
 
-private[v1] object ApiStreamingRootResource {
-  def getListener(ui: SparkUI): StreamingJobProgressListener = {
-    ui.getStreamingJobProgressListener match {
-      case Some(listener) => listener.asInstanceOf[StreamingJobProgressListener]
-      case None => throw new NotFoundException("no streaming listener attached to " + ui.getAppName)
-    }
+  private def avgTime(data: Seq[Long]): Option[Long] = {
+    if (data.isEmpty) None else Some(data.sum / data.size)
   }
+
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/39b3f10d/streaming/src/main/scala/org/apache/spark/status/api/v1/streaming/OneBatchResource.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/status/api/v1/streaming/OneBatchResource.scala b/streaming/src/main/scala/org/apache/spark/status/api/v1/streaming/OneBatchResource.scala
deleted file mode 100644
index d3c689c..0000000
--- a/streaming/src/main/scala/org/apache/spark/status/api/v1/streaming/OneBatchResource.scala
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.status.api.v1.streaming
-
-import javax.ws.rs.{GET, PathParam, Produces}
-import javax.ws.rs.core.MediaType
-
-import org.apache.spark.status.api.v1.NotFoundException
-import org.apache.spark.streaming.ui.StreamingJobProgressListener
-
-@Produces(Array(MediaType.APPLICATION_JSON))
-private[v1] class OneBatchResource(listener: StreamingJobProgressListener) {
-
-  @GET
-  def oneBatch(@PathParam("batchId") batchId: Long): BatchInfo = {
-    val someBatch = AllBatchesResource.batchInfoList(listener)
-      .find { _.batchId == batchId }
-    someBatch.getOrElse(throw new NotFoundException("unknown batch: " + batchId))
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/39b3f10d/streaming/src/main/scala/org/apache/spark/status/api/v1/streaming/OneOutputOperationResource.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/status/api/v1/streaming/OneOutputOperationResource.scala b/streaming/src/main/scala/org/apache/spark/status/api/v1/streaming/OneOutputOperationResource.scala
deleted file mode 100644
index aabcdb2..0000000
--- a/streaming/src/main/scala/org/apache/spark/status/api/v1/streaming/OneOutputOperationResource.scala
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.status.api.v1.streaming
-
-import javax.ws.rs.{GET, PathParam, Produces}
-import javax.ws.rs.core.MediaType
-
-import org.apache.spark.status.api.v1.NotFoundException
-import org.apache.spark.streaming.ui.StreamingJobProgressListener
-import org.apache.spark.streaming.ui.StreamingJobProgressListener._
-
-@Produces(Array(MediaType.APPLICATION_JSON))
-private[v1] class OneOutputOperationResource(listener: StreamingJobProgressListener) {
-
-  @GET
-  def oneOperation(
-      @PathParam("batchId") batchId: Long,
-      @PathParam("outputOpId") opId: OutputOpId): OutputOperationInfo = {
-
-    val someOutputOp = AllOutputOperationsResource.outputOperationInfoList(listener, batchId)
-      .find { _.outputOpId == opId }
-    someOutputOp.getOrElse(throw new NotFoundException("unknown output operation: " + opId))
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/39b3f10d/streaming/src/main/scala/org/apache/spark/status/api/v1/streaming/OneReceiverResource.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/status/api/v1/streaming/OneReceiverResource.scala b/streaming/src/main/scala/org/apache/spark/status/api/v1/streaming/OneReceiverResource.scala
deleted file mode 100644
index c0cc99d..0000000
--- a/streaming/src/main/scala/org/apache/spark/status/api/v1/streaming/OneReceiverResource.scala
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.status.api.v1.streaming
-
-import javax.ws.rs.{GET, PathParam, Produces}
-import javax.ws.rs.core.MediaType
-
-import org.apache.spark.status.api.v1.NotFoundException
-import org.apache.spark.streaming.ui.StreamingJobProgressListener
-
-@Produces(Array(MediaType.APPLICATION_JSON))
-private[v1] class OneReceiverResource(listener: StreamingJobProgressListener) {
-
-  @GET
-  def oneReceiver(@PathParam("streamId") streamId: Int): ReceiverInfo = {
-    val someReceiver = AllReceiversResource.receiverInfoList(listener)
-      .find { _.streamId == streamId }
-    someReceiver.getOrElse(throw new NotFoundException("unknown receiver: " + streamId))
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/39b3f10d/streaming/src/main/scala/org/apache/spark/status/api/v1/streaming/StreamingStatisticsResource.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/status/api/v1/streaming/StreamingStatisticsResource.scala b/streaming/src/main/scala/org/apache/spark/status/api/v1/streaming/StreamingStatisticsResource.scala
deleted file mode 100644
index 6cff87b..0000000
--- a/streaming/src/main/scala/org/apache/spark/status/api/v1/streaming/StreamingStatisticsResource.scala
+++ /dev/null
@@ -1,64 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.status.api.v1.streaming
-
-import java.util.Date
-import javax.ws.rs.{GET, Produces}
-import javax.ws.rs.core.MediaType
-
-import org.apache.spark.streaming.ui.StreamingJobProgressListener
-
-@Produces(Array(MediaType.APPLICATION_JSON))
-private[v1] class StreamingStatisticsResource(listener: StreamingJobProgressListener) {
-
-  @GET
-  def streamingStatistics(): StreamingStatistics = {
-    listener.synchronized {
-      val batches = listener.retainedBatches
-      val avgInputRate = avgRate(batches.map(_.numRecords * 1000.0 / listener.batchDuration))
-      val avgSchedulingDelay = avgTime(batches.flatMap(_.schedulingDelay))
-      val avgProcessingTime = avgTime(batches.flatMap(_.processingDelay))
-      val avgTotalDelay = avgTime(batches.flatMap(_.totalDelay))
-
-      new StreamingStatistics(
-        startTime = new Date(listener.startTime),
-        batchDuration = listener.batchDuration,
-        numReceivers = listener.numReceivers,
-        numActiveReceivers = listener.numActiveReceivers,
-        numInactiveReceivers = listener.numInactiveReceivers,
-        numTotalCompletedBatches = listener.numTotalCompletedBatches,
-        numRetainedCompletedBatches = listener.retainedCompletedBatches.size,
-        numActiveBatches = listener.numUnprocessedBatches,
-        numProcessedRecords = listener.numTotalProcessedRecords,
-        numReceivedRecords = listener.numTotalReceivedRecords,
-        avgInputRate = avgInputRate,
-        avgSchedulingDelay = avgSchedulingDelay,
-        avgProcessingTime = avgProcessingTime,
-        avgTotalDelay = avgTotalDelay
-      )
-    }
-  }
-
-  private def avgRate(data: Seq[Double]): Option[Double] = {
-    if (data.isEmpty) None else Some(data.sum / data.size)
-  }
-
-  private def avgTime(data: Seq[Long]): Option[Long] = {
-    if (data.isEmpty) None else Some(data.sum / data.size)
-  }
-}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message