spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From iras...@apache.org
Subject spark git commit: [SPARK-7889][WEBUI] HistoryServer updates UI for incomplete apps
Date Fri, 12 Feb 2016 03:38:04 GMT
Repository: spark
Updated Branches:
  refs/heads/master d3e2e2029 -> a2c7dcf61


[SPARK-7889][WEBUI] HistoryServer updates UI for incomplete apps

When the HistoryServer is showing an incomplete app, it needs to check if there is a newer version of the app available.  It does this by checking if a version of the app has been loaded with a larger *filesize*.  If so, it detaches the current UI, attaches the new one, and redirects back to the same URL to show the new UI.

https://issues.apache.org/jira/browse/SPARK-7889

Author: Steve Loughran <stevel@hortonworks.com>
Author: Imran Rashid <irashid@cloudera.com>

Closes #11118 from squito/SPARK-7889-alternate.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/a2c7dcf6
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/a2c7dcf6
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/a2c7dcf6

Branch: refs/heads/master
Commit: a2c7dcf61f33fa1897c950d2d905651103c170ea
Parents: d3e2e20
Author: Steve Loughran <stevel@hortonworks.com>
Authored: Thu Feb 11 21:37:53 2016 -0600
Committer: Imran Rashid <irashid@cloudera.com>
Committed: Thu Feb 11 21:37:53 2016 -0600

----------------------------------------------------------------------
 .../spark/deploy/history/ApplicationCache.scala | 665 +++++++++++++++++++
 .../history/ApplicationHistoryProvider.scala    |  42 +-
 .../deploy/history/FsHistoryProvider.scala      | 149 ++++-
 .../spark/deploy/history/HistoryPage.scala      |   2 +-
 .../spark/deploy/history/HistoryServer.scala    |  78 ++-
 .../spark/scheduler/EventLoggingListener.scala  |   7 +
 .../deploy/history/ApplicationCacheSuite.scala  | 488 ++++++++++++++
 .../deploy/history/HistoryServerSuite.scala     | 224 ++++++-
 docs/monitoring.md                              |  70 +-
 project/MimaExcludes.scala                      |   3 +
 10 files changed, 1654 insertions(+), 74 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/a2c7dcf6/core/src/main/scala/org/apache/spark/deploy/history/ApplicationCache.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/history/ApplicationCache.scala b/core/src/main/scala/org/apache/spark/deploy/history/ApplicationCache.scala
new file mode 100644
index 0000000..e2fda29
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/deploy/history/ApplicationCache.scala
@@ -0,0 +1,665 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.history
+
+import java.util.NoSuchElementException
+import javax.servlet.{DispatcherType, Filter, FilterChain, FilterConfig, ServletException, ServletRequest, ServletResponse}
+import javax.servlet.http.{HttpServletRequest, HttpServletResponse}
+
+import scala.collection.JavaConverters._
+import scala.util.control.NonFatal
+
+import com.codahale.metrics.{Counter, MetricRegistry, Timer}
+import com.google.common.cache.{CacheBuilder, CacheLoader, LoadingCache, RemovalListener, RemovalNotification}
+import org.eclipse.jetty.servlet.FilterHolder
+
+import org.apache.spark.Logging
+import org.apache.spark.metrics.source.Source
+import org.apache.spark.ui.SparkUI
+import org.apache.spark.util.Clock
+
+/**
+ * Cache for applications.
+ *
+ * Completed applications are cached for as long as there is capacity for them.
+ * Incompleted applications have their update time checked on every
+ * retrieval; if the cached entry is out of date, it is refreshed.
+ *
+ * @note there must be only one instance of [[ApplicationCache]] in a
+ * JVM at a time. This is because a static field in [[ApplicationCacheCheckFilterRelay]]
+ * keeps a reference to the cache so that HTTP requests on the attempt-specific web UIs
+ * can probe the current cache to see if the attempts have changed.
+ *
+ * Creating multiple instances will break this routing.
+ * @param operations implementation of record access operations
+ * @param retainedApplications number of retained applications
+ * @param clock time source
+ */
+private[history] class ApplicationCache(
+    val operations: ApplicationCacheOperations,
+    val retainedApplications: Int,
+    val clock: Clock) extends Logging {
+
+  /**
+   * Services the load request from the cache.
+   */
+  private val appLoader = new CacheLoader[CacheKey, CacheEntry] {
+
+    /** the cache key doesn't match a cached entry, or the entry is out-of-date, so load it. */
+    override def load(key: CacheKey): CacheEntry = {
+      loadApplicationEntry(key.appId, key.attemptId)
+    }
+
+  }
+
+  /**
+   * Handler for callbacks from the cache of entry removal.
+   */
+  private val removalListener = new RemovalListener[CacheKey, CacheEntry] {
+
+    /**
+     * Removal event notifies the provider to detach the UI.
+     * @param rm removal notification
+     */
+    override def onRemoval(rm: RemovalNotification[CacheKey, CacheEntry]): Unit = {
+      metrics.evictionCount.inc()
+      val key = rm.getKey
+      logDebug(s"Evicting entry ${key}")
+      operations.detachSparkUI(key.appId, key.attemptId, rm.getValue().ui)
+    }
+  }
+
+  /**
+   * The cache of applications.
+   *
+   * Tagged as `protected` so as to allow subclasses in tests to accesss it directly
+   */
+  protected val appCache: LoadingCache[CacheKey, CacheEntry] = {
+    CacheBuilder.newBuilder()
+        .maximumSize(retainedApplications)
+        .removalListener(removalListener)
+        .build(appLoader)
+  }
+
+  /**
+   * The metrics which are updated as the cache is used.
+   */
+  val metrics = new CacheMetrics("history.cache")
+
+  init()
+
+  /**
+   * Perform any startup operations.
+   *
+   * This includes declaring this instance as the cache to use in the
+   * [[ApplicationCacheCheckFilterRelay]].
+   */
+  private def init(): Unit = {
+    ApplicationCacheCheckFilterRelay.setApplicationCache(this)
+  }
+
+  /**
+   * Stop the cache.
+   * This will reset the relay in [[ApplicationCacheCheckFilterRelay]].
+   */
+  def stop(): Unit = {
+    ApplicationCacheCheckFilterRelay.resetApplicationCache()
+  }
+
+  /**
+   * Get an entry.
+   *
+   * Cache fetch/refresh will have taken place by the time this method returns.
+   * @param appAndAttempt application to look up in the format needed by the history server web UI,
+   *                      `appId/attemptId` or `appId`.
+   * @return the entry
+   */
+  def get(appAndAttempt: String): SparkUI = {
+    val parts = splitAppAndAttemptKey(appAndAttempt)
+    get(parts._1, parts._2)
+  }
+
+  /**
+   * Get the Spark UI, converting a lookup failure from an exception to `None`.
+   * @param appAndAttempt application to look up in the format needed by the history server web UI,
+   *                      `appId/attemptId` or `appId`.
+   * @return the entry
+   */
+  def getSparkUI(appAndAttempt: String): Option[SparkUI] = {
+    try {
+      val ui = get(appAndAttempt)
+      Some(ui)
+    } catch {
+      case NonFatal(e) => e.getCause() match {
+        case nsee: NoSuchElementException =>
+          None
+        case cause: Exception => throw cause
+      }
+    }
+  }
+
+  /**
+   * Get the associated spark UI.
+   *
+   * Cache fetch/refresh will have taken place by the time this method returns.
+   * @param appId application ID
+   * @param attemptId optional attempt ID
+   * @return the entry
+   */
+  def get(appId: String, attemptId: Option[String]): SparkUI = {
+    lookupAndUpdate(appId, attemptId)._1.ui
+  }
+
+  /**
+   * Look up the entry; update it if needed.
+   * @param appId application ID
+   * @param attemptId optional attempt ID
+   * @return the underlying cache entry -which can have its timestamp changed, and a flag to
+   *         indicate that the entry has changed
+   */
+  private def lookupAndUpdate(appId: String, attemptId: Option[String]): (CacheEntry, Boolean) = {
+    metrics.lookupCount.inc()
+    val cacheKey = CacheKey(appId, attemptId)
+    var entry = appCache.getIfPresent(cacheKey)
+    var updated = false
+    if (entry == null) {
+      // no entry, so fetch without any post-fetch probes for out-of-dateness
+      // this will trigger a callback to loadApplicationEntry()
+      entry = appCache.get(cacheKey)
+    } else if (!entry.completed) {
+      val now = clock.getTimeMillis()
+      log.debug(s"Probing at time $now for updated application $cacheKey -> $entry")
+      metrics.updateProbeCount.inc()
+      updated = time(metrics.updateProbeTimer) {
+        entry.updateProbe()
+      }
+      if (updated) {
+        logDebug(s"refreshing $cacheKey")
+        metrics.updateTriggeredCount.inc()
+        appCache.refresh(cacheKey)
+        // and repeat the lookup
+        entry = appCache.get(cacheKey)
+      } else {
+        // update the probe timestamp to the current time
+        entry.probeTime = now
+      }
+    }
+    (entry, updated)
+  }
+
+  /**
+   * This method is visible for testing.
+   *
+   * It looks up the cached entry *and returns a clone of it*.
+   * This ensures that the cached entries never leak
+   * @param appId application ID
+   * @param attemptId optional attempt ID
+   * @return a new entry with shared SparkUI, but copies of the other fields.
+   */
+  def lookupCacheEntry(appId: String, attemptId: Option[String]): CacheEntry = {
+    val entry = lookupAndUpdate(appId, attemptId)._1
+    new CacheEntry(entry.ui, entry.completed, entry.updateProbe, entry.probeTime)
+  }
+
+  /**
+   * Probe for an application being updated.
+   * @param appId application ID
+   * @param attemptId attempt ID
+   * @return true if an update has been triggered
+   */
+  def checkForUpdates(appId: String, attemptId: Option[String]): Boolean = {
+    val (entry, updated) = lookupAndUpdate(appId, attemptId)
+    updated
+  }
+
+  /**
+   * Size probe, primarily for testing.
+   * @return size
+   */
+  def size(): Long = appCache.size()
+
+  /**
+   * Emptiness predicate, primarily for testing.
+   * @return true if the cache is empty
+   */
+  def isEmpty: Boolean = appCache.size() == 0
+
+  /**
+   * Time a closure, returning its output.
+   * @param t timer
+   * @param f function
+   * @tparam T type of return value of time
+   * @return the result of the function.
+   */
+  private def time[T](t: Timer)(f: => T): T = {
+    val timeCtx = t.time()
+    try {
+      f
+    } finally {
+      timeCtx.close()
+    }
+  }
+
+  /**
+   * Load the Spark UI via [[ApplicationCacheOperations.getAppUI()]],
+   * then attach it to the web UI via [[ApplicationCacheOperations.attachSparkUI()]].
+   *
+   * If the application is incomplete, it has the [[ApplicationCacheCheckFilter]]
+   * added as a filter to the HTTP requests, so that queries on the UI will trigger
+   * update checks.
+   *
+   * The generated entry contains the UI and the current timestamp.
+   * The timer [[metrics.loadTimer]] tracks the time taken to load the UI.
+   *
+   * @param appId application ID
+   * @param attemptId optional attempt ID
+   * @return the cache entry
+   * @throws NoSuchElementException if there is no matching element
+   */
+  @throws[NoSuchElementException]
+  def loadApplicationEntry(appId: String, attemptId: Option[String]): CacheEntry = {
+
+    logDebug(s"Loading application Entry $appId/$attemptId")
+    metrics.loadCount.inc()
+    time(metrics.loadTimer) {
+      operations.getAppUI(appId, attemptId) match {
+        case Some(LoadedAppUI(ui, updateState)) =>
+          val completed = ui.getApplicationInfoList.exists(_.attempts.last.completed)
+          if (completed) {
+            // completed spark UIs are attached directly
+            operations.attachSparkUI(appId, attemptId, ui, completed)
+          } else {
+            // incomplete UIs have the cache-check filter put in front of them.
+            ApplicationCacheCheckFilterRelay.registerFilter(ui, appId, attemptId)
+            operations.attachSparkUI(appId, attemptId, ui, completed)
+          }
+          // build the cache entry
+          val now = clock.getTimeMillis()
+          val entry = new CacheEntry(ui, completed, updateState, now)
+          logDebug(s"Loaded application $appId/$attemptId -> $entry")
+          entry
+        case None =>
+          metrics.lookupFailureCount.inc()
+          // guava's cache logs via java.util log, so is of limited use. Hence: our own message
+          logInfo(s"Failed to load application attempt $appId/$attemptId")
+          throw new NoSuchElementException(s"no application with application Id '$appId'" +
+              attemptId.map { id => s" attemptId '$id'" }.getOrElse(" and no attempt Id"))
+      }
+    }
+  }
+
+  /**
+   * Split up an `applicationId/attemptId` or `applicationId` key into the separate pieces.
+   *
+   * @param appAndAttempt combined key
+   * @return a tuple of the application ID and, if present, the attemptID
+   */
+  def splitAppAndAttemptKey(appAndAttempt: String): (String, Option[String]) = {
+    val parts = appAndAttempt.split("/")
+    require(parts.length == 1 || parts.length == 2, s"Invalid app key $appAndAttempt")
+    val appId = parts(0)
+    val attemptId = if (parts.length > 1) Some(parts(1)) else None
+    (appId, attemptId)
+  }
+
+  /**
+   * Merge an appId and optional attempt Id into a key of the form `applicationId/attemptId`.
+   *
+   * If there is an `attemptId`; `applicationId` if not.
+   * @param appId application ID
+   * @param attemptId optional attempt ID
+   * @return a unified string
+   */
+  def mergeAppAndAttemptToKey(appId: String, attemptId: Option[String]): String = {
+    appId + attemptId.map { id => s"/$id" }.getOrElse("")
+  }
+
+  /**
+   * String operator dumps the cache entries and metrics.
+   * @return a string value, primarily for testing and diagnostics
+   */
+  override def toString: String = {
+    val sb = new StringBuilder(s"ApplicationCache(" +
+          s" retainedApplications= $retainedApplications)")
+    sb.append(s"; time= ${clock.getTimeMillis()}")
+    sb.append(s"; entry count= ${appCache.size()}\n")
+    sb.append("----\n")
+    appCache.asMap().asScala.foreach {
+      case(key, entry) => sb.append(s"  $key -> $entry\n")
+    }
+    sb.append("----\n")
+    sb.append(metrics)
+    sb.append("----\n")
+    sb.toString()
+  }
+}
+
+/**
+ * An entry in the cache.
+ *
+ * @param ui Spark UI
+ * @param completed Flag to indicated that the application has completed (and so
+ *                 does not need refreshing).
+ * @param updateProbe function to call to see if the application has been updated and
+ *                    therefore that the cached value needs to be refreshed.
+ * @param probeTime Times in milliseconds when the probe was last executed.
+ */
+private[history] final class CacheEntry(
+    val ui: SparkUI,
+    val completed: Boolean,
+    val updateProbe: () => Boolean,
+    var probeTime: Long) {
+
+  /** string value is for test assertions */
+  override def toString: String = {
+    s"UI $ui, completed=$completed, probeTime=$probeTime"
+  }
+}
+
+/**
+ * Cache key: compares on `appId` and then, if non-empty, `attemptId`.
+ * The [[hashCode()]] function uses the same fields.
+ * @param appId application ID
+ * @param attemptId attempt ID
+ */
+private[history] final case class CacheKey(appId: String, attemptId: Option[String]) {
+
+  override def toString: String = {
+    appId + attemptId.map { id => s"/$id" }.getOrElse("")
+  }
+}
+
+/**
+ * Metrics of the cache
+ * @param prefix prefix to register all entries under
+ */
+private[history] class CacheMetrics(prefix: String) extends Source {
+
+  /* metrics: counters and timers */
+  val lookupCount = new Counter()
+  val lookupFailureCount = new Counter()
+  val evictionCount = new Counter()
+  val loadCount = new Counter()
+  val loadTimer = new Timer()
+  val updateProbeCount = new Counter()
+  val updateProbeTimer = new Timer()
+  val updateTriggeredCount = new Counter()
+
+  /** all the counters: for registration and string conversion. */
+  private val counters = Seq(
+    ("lookup.count", lookupCount),
+    ("lookup.failure.count", lookupFailureCount),
+    ("eviction.count", evictionCount),
+    ("load.count", loadCount),
+    ("update.probe.count", updateProbeCount),
+    ("update.triggered.count", updateTriggeredCount))
+
+  /** all metrics, including timers */
+  private val allMetrics = counters ++ Seq(
+    ("load.timer", loadTimer),
+    ("update.probe.timer", updateProbeTimer))
+
+  /**
+   * Name of metric source
+   */
+  override val sourceName = "ApplicationCache"
+
+  override val metricRegistry: MetricRegistry = new MetricRegistry
+
+  /**
+   * Startup actions.
+   * This includes registering metrics with [[metricRegistry]]
+   */
+  private def init(): Unit = {
+    allMetrics.foreach { case (name, metric) =>
+      metricRegistry.register(MetricRegistry.name(prefix, name), metric)
+    }
+  }
+
+  override def toString: String = {
+    val sb = new StringBuilder()
+    counters.foreach { case (name, counter) =>
+      sb.append(name).append(" = ").append(counter.getCount).append('\n')
+    }
+    sb.toString()
+  }
+}
+
+/**
+ * API for cache events. That is: loading an App UI; and for
+ * attaching/detaching the UI to and from the Web UI.
+ */
+private[history] trait ApplicationCacheOperations {
+
+  /**
+   * Get the application UI and the probe neededed to see if it has been updated.
+   * @param appId application ID
+   * @param attemptId attempt ID
+   * @return If found, the Spark UI and any history information to be used in the cache
+   */
+  def getAppUI(appId: String, attemptId: Option[String]): Option[LoadedAppUI]
+
+  /**
+   * Attach a reconstructed UI.
+   * @param appId application ID
+   * @param attemptId attempt ID
+   * @param ui UI
+   * @param completed flag to indicate that the UI has completed
+   */
+  def attachSparkUI(
+      appId: String,
+      attemptId: Option[String],
+      ui: SparkUI,
+      completed: Boolean): Unit
+
+  /**
+   * Detach a Spark UI.
+   *
+   * @param ui Spark UI
+   */
+  def detachSparkUI(appId: String, attemptId: Option[String], ui: SparkUI): Unit
+
+}
+
+/**
+ * This is a servlet filter which intercepts HTTP requests on application UIs and
+ * triggers checks for updated data.
+ *
+ * If the application cache indicates that the application has been updated,
+ * the filter returns a 302 redirect to the caller, asking them to re-request the web
+ * page.
+ *
+ * Because the application cache will detach and then re-attach the UI, when the caller
+ * repeats that request, it will now pick up the newly-updated web application.
+ *
+ * This does require the caller to handle 302 requests. Because of the ambiguity
+ * in how POST and PUT operations are responded to (that is, should a 307 be
+ * processed directly), the filter <i>does not</i> filter those requests.
+ * As the current web UIs are read-only, this is not an issue. If it were ever to
+ * support more HTTP verbs, then some support may be required. Perhaps, rather
+ * than sending a redirect, simply updating the value so that the <i>next</i>
+ * request will pick it up.
+ *
+ * Implementation note: there's some abuse of a shared global entry here because
+ * the configuration data passed to the servlet is just a string:string map.
+ */
+private[history] class ApplicationCacheCheckFilter() extends Filter with Logging {
+
+  import ApplicationCacheCheckFilterRelay._
+  var appId: String = _
+  var attemptId: Option[String] = _
+
+  /**
+   * Bind the app and attempt ID, throwing an exception if no application ID was provided.
+   * @param filterConfig configuration
+   */
+  override def init(filterConfig: FilterConfig): Unit = {
+
+    appId = Option(filterConfig.getInitParameter(APP_ID))
+      .getOrElse(throw new ServletException(s"Missing Parameter $APP_ID"))
+    attemptId = Option(filterConfig.getInitParameter(ATTEMPT_ID))
+    logDebug(s"initializing filter $this")
+  }
+
+  /**
+   * Filter the request.
+   * Either the caller is given a 302 redirect to the current URL, or the
+   * request is passed on to the SparkUI servlets.
+   *
+   * @param request HttpServletRequest
+   * @param response HttpServletResponse
+   * @param chain the rest of the request chain
+   */
+  override def doFilter(
+      request: ServletRequest,
+      response: ServletResponse,
+      chain: FilterChain): Unit = {
+
+    // nobody has ever implemented any other kind of servlet, yet
+    // this check is universal, just in case someone does exactly
+    // that on your classpath
+    if (!(request.isInstanceOf[HttpServletRequest])) {
+      throw new ServletException("This filter only works for HTTP/HTTPS")
+    }
+    val httpRequest = request.asInstanceOf[HttpServletRequest]
+    val httpResponse = response.asInstanceOf[HttpServletResponse]
+    val requestURI = httpRequest.getRequestURI
+    val operation = httpRequest.getMethod
+
+    // if the request is for an attempt, check to see if it is in need of delete/refresh
+    // and have the cache update the UI if so
+    if (operation=="HEAD" || operation=="GET"
+        && checkForUpdates(requestURI, appId, attemptId)) {
+      // send a redirect back to the same location. This will be routed
+      // to the *new* UI
+      logInfo(s"Application Attempt $appId/$attemptId updated; refreshing")
+      val queryStr = Option(httpRequest.getQueryString).map("?" + _).getOrElse("")
+      val redirectUrl = httpResponse.encodeRedirectURL(requestURI + queryStr)
+      httpResponse.sendRedirect(redirectUrl)
+    } else {
+      chain.doFilter(request, response)
+    }
+  }
+
+  override def destroy(): Unit = {
+  }
+
+  override def toString: String = s"ApplicationCacheCheckFilter for $appId/$attemptId"
+}
+
+/**
+ * Global state for the [[ApplicationCacheCheckFilter]] instances, so that they can relay cache
+ * probes to the cache.
+ *
+ * This is an ugly workaround for the limitation of servlets and filters in the Java servlet
+ * API; they are still configured on the model of a list of classnames and configuration
+ * strings in a `web.xml` field, rather than a chain of instances wired up by hand or
+ * via an injection framework. There is no way to directly configure a servlet filter instance
+ * with a reference to the application cache which is must use: some global state is needed.
+ *
+ * Here, [[ApplicationCacheCheckFilter]] is that global state; it relays all requests
+ * to the singleton [[ApplicationCache]]
+ *
+ * The field `applicationCache` must be set for the filters to work -
+ * this is done during the construction of [[ApplicationCache]], which requires that there
+ * is only one cache serving requests through the WebUI.
+ *
+ * *Important* In test runs, if there is more than one [[ApplicationCache]], the relay logic
+ * will break: filters may not find instances. Tests must not do that.
+ *
+ */
+private[history] object ApplicationCacheCheckFilterRelay extends Logging {
+  // name of the app ID entry in the filter configuration. Mandatory.
+  val APP_ID = "appId"
+
+  // name of the attempt ID entry in the filter configuration. Optional.
+  val ATTEMPT_ID = "attemptId"
+
+  // namer of the filter to register
+  val FILTER_NAME = "org.apache.spark.deploy.history.ApplicationCacheCheckFilter"
+
+  /** the application cache to relay requests to */
+  @volatile
+  private var applicationCache: Option[ApplicationCache] = None
+
+  /**
+   * Set the application cache. Logs a warning if it is overwriting an existing value
+   * @param cache new cache
+   */
+  def setApplicationCache(cache: ApplicationCache): Unit = {
+    applicationCache.foreach( c => logWarning(s"Overwriting application cache $c"))
+    applicationCache = Some(cache)
+  }
+
+  /**
+   * Reset the application cache
+   */
+  def resetApplicationCache(): Unit = {
+    applicationCache = None
+  }
+
+  /**
+   * Check to see if there has been an update
+   * @param requestURI URI the request came in on
+   * @param appId application ID
+   * @param attemptId attempt ID
+   * @return true if an update was loaded for the app/attempt
+   */
+  def checkForUpdates(requestURI: String, appId: String, attemptId: Option[String]): Boolean = {
+
+    logDebug(s"Checking $appId/$attemptId from $requestURI")
+    applicationCache match {
+      case Some(cache) =>
+        try {
+          cache.checkForUpdates(appId, attemptId)
+        } catch {
+          case ex: Exception =>
+            // something went wrong. Keep going with the existing UI
+            logWarning(s"When checking for $appId/$attemptId from $requestURI", ex)
+            false
+        }
+
+      case None =>
+        logWarning("No application cache instance defined")
+        false
+    }
+  }
+
+
+  /**
+   * Register a filter for the web UI which checks for updates to the given app/attempt
+   * @param ui Spark UI to attach filters to
+   * @param appId application ID
+   * @param attemptId attempt ID
+   */
+  def registerFilter(
+      ui: SparkUI,
+      appId: String,
+      attemptId: Option[String] ): Unit = {
+    require(ui != null)
+    val enumDispatcher = java.util.EnumSet.of(DispatcherType.ASYNC, DispatcherType.REQUEST)
+    val holder = new FilterHolder()
+    holder.setClassName(FILTER_NAME)
+    holder.setInitParameter(APP_ID, appId)
+    attemptId.foreach( id => holder.setInitParameter(ATTEMPT_ID, id))
+    require(ui.getHandlers != null, "null handlers")
+    ui.getHandlers.foreach { handler =>
+      handler.addFilter(holder, "/*", enumDispatcher)
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/a2c7dcf6/core/src/main/scala/org/apache/spark/deploy/history/ApplicationHistoryProvider.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/history/ApplicationHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/ApplicationHistoryProvider.scala
index 5f5e0fe..44661ed 100644
--- a/core/src/main/scala/org/apache/spark/deploy/history/ApplicationHistoryProvider.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/history/ApplicationHistoryProvider.scala
@@ -33,7 +33,42 @@ private[spark] case class ApplicationAttemptInfo(
 private[spark] case class ApplicationHistoryInfo(
     id: String,
     name: String,
-    attempts: List[ApplicationAttemptInfo])
+    attempts: List[ApplicationAttemptInfo]) {
+
+  /**
+   * Has this application completed?
+   * @return true if the most recent attempt has completed
+   */
+  def completed: Boolean = {
+    attempts.nonEmpty && attempts.head.completed
+  }
+}
+
+/**
+ *  A probe which can be invoked to see if a loaded Web UI has been updated.
+ *  The probe is expected to be relative purely to that of the UI returned
+ *  in the same [[LoadedAppUI]] instance. That is, whenever a new UI is loaded,
+ *  the probe returned with it is the one that must be used to check for it
+ *  being out of date; previous probes must be discarded.
+ */
+private[history] abstract class HistoryUpdateProbe {
+  /**
+   * Return true if the history provider has a later version of the application
+   * attempt than the one against this probe was constructed.
+   * @return
+   */
+  def isUpdated(): Boolean
+}
+
+/**
+ * All the information returned from a call to `getAppUI()`: the new UI
+ * and any required update state.
+ * @param ui Spark UI
+ * @param updateProbe probe to call to check on the update state of this application attempt
+ */
+private[history] case class LoadedAppUI(
+    ui: SparkUI,
+    updateProbe: () => Boolean)
 
 private[history] abstract class ApplicationHistoryProvider {
 
@@ -49,9 +84,10 @@ private[history] abstract class ApplicationHistoryProvider {
    *
    * @param appId The application ID.
    * @param attemptId The application attempt ID (or None if there is no attempt ID).
-   * @return The application's UI, or None if application is not found.
+   * @return a [[LoadedAppUI]] instance containing the application's UI and any state information
+   *         for update probes, or `None` if the application/attempt is not found.
    */
-  def getAppUI(appId: String, attemptId: Option[String]): Option[SparkUI]
+  def getAppUI(appId: String, attemptId: Option[String]): Option[LoadedAppUI]
 
   /**
    * Called when the server is shutting down.

http://git-wip-us.apache.org/repos/asf/spark/blob/a2c7dcf6/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
index 9648959..f885798 100644
--- a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
@@ -17,7 +17,7 @@
 
 package org.apache.spark.deploy.history
 
-import java.io.{BufferedInputStream, FileNotFoundException, InputStream, IOException, OutputStream}
+import java.io.{FileNotFoundException, IOException, OutputStream}
 import java.util.UUID
 import java.util.concurrent.{Executors, ExecutorService, TimeUnit}
 import java.util.zip.{ZipEntry, ZipOutputStream}
@@ -33,7 +33,6 @@ import org.apache.hadoop.security.AccessControlException
 
 import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkException}
 import org.apache.spark.deploy.SparkHadoopUtil
-import org.apache.spark.io.CompressionCodec
 import org.apache.spark.scheduler._
 import org.apache.spark.ui.SparkUI
 import org.apache.spark.util.{Clock, SystemClock, ThreadUtils, Utils}
@@ -42,6 +41,31 @@ import org.apache.spark.util.{Clock, SystemClock, ThreadUtils, Utils}
  * A class that provides application history from event logs stored in the file system.
  * This provider checks for new finished applications in the background periodically and
  * renders the history application UI by parsing the associated event logs.
+ *
+ * == How new and updated attempts are detected ==
+ *
+ * - New attempts are detected in [[checkForLogs]]: the log dir is scanned, and any
+ * entries in the log dir whose modification time is greater than the last scan time
+ * are considered new or updated. These are replayed to create a new [[FsApplicationAttemptInfo]]
+ * entry and update or create a matching [[FsApplicationHistoryInfo]] element in the list
+ * of applications.
+ * - Updated attempts are also found in [[checkForLogs]] -- if the attempt's log file has grown, the
+ * [[FsApplicationAttemptInfo]] is replaced by another one with a larger log size.
+ * - When [[updateProbe()]] is invoked to check if a loaded [[SparkUI]]
+ * instance is out of date, the log size of the cached instance is checked against the app last
+ * loaded by [[checkForLogs]].
+ *
+ * The use of log size, rather than simply relying on modification times, is needed to
+ * address the following issues
+ * - some filesystems do not appear to update the `modtime` value whenever data is flushed to
+ * an open file output stream. Changes to the history may not be picked up.
+ * - the granularity of the `modtime` field may be 2+ seconds. Rapid changes to the FS can be
+ * missed.
+ *
+ * Tracking filesize works given the following invariant: the logs get bigger
+ * as new events are added. If a format was used in which this did not hold, the mechanism would
+ * break. Simple streaming of JSON-formatted events, as is implemented today, implicitly
+ * maintains this invariant.
  */
 private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
   extends ApplicationHistoryProvider with Logging {
@@ -77,9 +101,8 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
   private val pool = Executors.newScheduledThreadPool(1, new ThreadFactoryBuilder()
     .setNameFormat("spark-history-task-%d").setDaemon(true).build())
 
-  // The modification time of the newest log detected during the last scan. This is used
-  // to ignore logs that are older during subsequent scans, to avoid processing data that
-  // is already known.
+  // The modification time of the newest log detected during the last scan.   Currently only
+  // used for logging msgs (logs are re-scanned based on file size, rather than modtime)
   private var lastScanTime = -1L
 
   // Mapping of application IDs to their metadata, in descending end time order. Apps are inserted
@@ -87,6 +110,8 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
   @volatile private var applications: mutable.LinkedHashMap[String, FsApplicationHistoryInfo]
     = new mutable.LinkedHashMap()
 
+  val fileToAppInfo = new mutable.HashMap[Path, FsApplicationAttemptInfo]()
+
   // List of application logs to be deleted by event log cleaner.
   private var attemptsToClean = new mutable.ListBuffer[FsApplicationAttemptInfo]
 
@@ -176,18 +201,21 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
     // Disable the background thread during tests.
     if (!conf.contains("spark.testing")) {
       // A task that periodically checks for event log updates on disk.
+      logDebug(s"Scheduling update thread every $UPDATE_INTERVAL_S seconds")
       pool.scheduleWithFixedDelay(getRunner(checkForLogs), 0, UPDATE_INTERVAL_S, TimeUnit.SECONDS)
 
       if (conf.getBoolean("spark.history.fs.cleaner.enabled", false)) {
         // A task that periodically cleans event logs on disk.
         pool.scheduleWithFixedDelay(getRunner(cleanLogs), 0, CLEAN_INTERVAL_S, TimeUnit.SECONDS)
       }
+    } else {
+      logDebug("Background update thread disabled for testing")
     }
   }
 
   override def getListing(): Iterable[FsApplicationHistoryInfo] = applications.values
 
-  override def getAppUI(appId: String, attemptId: Option[String]): Option[SparkUI] = {
+  override def getAppUI(appId: String, attemptId: Option[String]): Option[LoadedAppUI] = {
     try {
       applications.get(appId).flatMap { appInfo =>
         appInfo.attempts.find(_.attemptId == attemptId).flatMap { attempt =>
@@ -210,7 +238,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
             ui.getSecurityManager.setAdminAcls(appListener.adminAcls.getOrElse(""))
             ui.getSecurityManager.setViewAcls(attempt.sparkUser,
               appListener.viewAcls.getOrElse(""))
-            ui
+            LoadedAppUI(ui, updateProbe(appId, attemptId, attempt.fileSize))
           }
         }
       }
@@ -243,12 +271,15 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
   private[history] def checkForLogs(): Unit = {
     try {
       val newLastScanTime = getNewLastScanTime()
+      logDebug(s"Scanning $logDir with lastScanTime==$lastScanTime")
       val statusList = Option(fs.listStatus(new Path(logDir))).map(_.toSeq)
         .getOrElse(Seq[FileStatus]())
+      // scan for modified applications, replay and merge them
       val logInfos: Seq[FileStatus] = statusList
         .filter { entry =>
           try {
-            !entry.isDirectory() && (entry.getModificationTime() >= lastScanTime)
+            val prevFileSize = fileToAppInfo.get(entry.getPath()).map{_.fileSize}.getOrElse(0L)
+            !entry.isDirectory() && prevFileSize < entry.getLen()
           } catch {
             case e: AccessControlException =>
               // Do not use "logInfo" since these messages can get pretty noisy if printed on
@@ -262,6 +293,9 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
           entry1.getModificationTime() >= entry2.getModificationTime()
       }
 
+      if (logInfos.nonEmpty) {
+        logDebug(s"New/updated attempts found: ${logInfos.size} ${logInfos.map(_.getPath)}")
+      }
       logInfos.grouped(20)
         .map { batch =>
           replayExecutor.submit(new Runnable {
@@ -356,7 +390,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
         val bus = new ReplayListenerBus()
         val res = replay(fileStatus, bus)
         res match {
-          case Some(r) => logDebug(s"Application log ${r.logPath} loaded successfully.")
+          case Some(r) => logDebug(s"Application log ${r.logPath} loaded successfully: $r")
           case None => logWarning(s"Failed to load application log ${fileStatus.getPath}. " +
             "The application may have not started.")
         }
@@ -511,6 +545,12 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
       bus: ReplayListenerBus): Option[FsApplicationAttemptInfo] = {
     val logPath = eventLog.getPath()
     logInfo(s"Replaying log path: $logPath")
+    // Note that the eventLog may have *increased* in size since when we grabbed the filestatus,
+    // and when we read the file here.  That is OK -- it may result in an unnecessary refresh
+    // when there is no update, but will not result in missing an update.  We *must* prevent
+    // an error the other way -- if we report a size bigger (ie later) than the file that is
+    // actually read, we may never refresh the app.  FileStatus is guaranteed to be static
+    // after it's created, so we get a file size that is no bigger than what is actually read.
     val logInput = EventLoggingListener.openEventLog(logPath, fs)
     try {
       val appListener = new ApplicationEventListener
@@ -521,7 +561,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
       // Without an app ID, new logs will render incorrectly in the listing page, so do not list or
       // try to show their UI.
       if (appListener.appId.isDefined) {
-        Some(new FsApplicationAttemptInfo(
+        val attemptInfo = new FsApplicationAttemptInfo(
           logPath.getName(),
           appListener.appName.getOrElse(NOT_STARTED),
           appListener.appId.getOrElse(logPath.getName()),
@@ -530,7 +570,11 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
           appListener.endTime.getOrElse(-1L),
           eventLog.getModificationTime(),
           appListener.sparkUser.getOrElse(NOT_STARTED),
-          appCompleted))
+          appCompleted,
+          eventLog.getLen()
+        )
+        fileToAppInfo(logPath) = attemptInfo
+        Some(attemptInfo)
       } else {
         None
       }
@@ -564,12 +608,77 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
     dfs.setSafeMode(HdfsConstants.SafeModeAction.SAFEMODE_GET)
   }
 
+  /**
+   * String description for diagnostics
+   * @return a summary of the component state
+   */
+  override def toString: String = {
+    val header = s"""
+      | FsHistoryProvider: logdir=$logDir,
+      | last scan time=$lastScanTime
+      | Cached application count =${applications.size}}
+    """.stripMargin
+    val sb = new StringBuilder(header)
+    applications.foreach(entry => sb.append(entry._2).append("\n"))
+    sb.toString
+  }
+
+  /**
+   * Look up an application attempt
+   * @param appId application ID
+   * @param attemptId Attempt ID, if set
+   * @return the matching attempt, if found
+   */
+  def lookup(appId: String, attemptId: Option[String]): Option[FsApplicationAttemptInfo] = {
+    applications.get(appId).flatMap { appInfo =>
+      appInfo.attempts.find(_.attemptId == attemptId)
+    }
+  }
+
+  /**
+   * Return true iff a newer version of the UI is available.  The check is based on whether the
+   * fileSize for the currently loaded UI is smaller than the file size the last time
+   * the logs were loaded.
+   *
+   * This is a very cheap operation -- the work of loading the new attempt was already done
+   * by [[checkForLogs]].
+   * @param appId application to probe
+   * @param attemptId attempt to probe
+   * @param prevFileSize the file size of the logs for the currently displayed UI
+   */
+  private def updateProbe(
+      appId: String,
+      attemptId: Option[String],
+      prevFileSize: Long)(): Boolean = {
+    lookup(appId, attemptId) match {
+      case None =>
+        logDebug(s"Application Attempt $appId/$attemptId not found")
+        false
+      case Some(latest) =>
+        prevFileSize < latest.fileSize
+    }
+  }
 }
 
 private[history] object FsHistoryProvider {
   val DEFAULT_LOG_DIR = "file:/tmp/spark-events"
 }
 
+/**
+ * Application attempt information.
+ *
+ * @param logPath path to the log file, or, for a legacy log, its directory
+ * @param name application name
+ * @param appId application ID
+ * @param attemptId optional attempt ID
+ * @param startTime start time (from playback)
+ * @param endTime end time (from playback). -1 if the application is incomplete.
+ * @param lastUpdated the modification time of the log file when this entry was built by replaying
+ *                    the history.
+ * @param sparkUser user running the application
+ * @param completed flag to indicate whether or not the application has completed.
+ * @param fileSize the size of the log file the last time the file was scanned for changes
+ */
 private class FsApplicationAttemptInfo(
     val logPath: String,
     val name: String,
@@ -579,10 +688,24 @@ private class FsApplicationAttemptInfo(
     endTime: Long,
     lastUpdated: Long,
     sparkUser: String,
-    completed: Boolean = true)
+    completed: Boolean,
+    val fileSize: Long)
   extends ApplicationAttemptInfo(
-      attemptId, startTime, endTime, lastUpdated, sparkUser, completed)
+      attemptId, startTime, endTime, lastUpdated, sparkUser, completed) {
 
+  /** extend the superclass string value with the extra attributes of this class */
+  override def toString: String = {
+    s"FsApplicationAttemptInfo($name, $appId," +
+      s" ${super.toString}, source=$logPath, size=$fileSize"
+  }
+}
+
+/**
+ * Application history information
+ * @param id application ID
+ * @param name application name
+ * @param attempts list of attempts, most recent first.
+ */
 private class FsApplicationHistoryInfo(
     id: String,
     override val name: String,

http://git-wip-us.apache.org/repos/asf/spark/blob/a2c7dcf6/core/src/main/scala/org/apache/spark/deploy/history/HistoryPage.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/history/HistoryPage.scala b/core/src/main/scala/org/apache/spark/deploy/history/HistoryPage.scala
index cab7fae..2fad112 100644
--- a/core/src/main/scala/org/apache/spark/deploy/history/HistoryPage.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/history/HistoryPage.scala
@@ -30,7 +30,7 @@ private[history] class HistoryPage(parent: HistoryServer) extends WebUIPage("")
       Option(request.getParameter("showIncomplete")).getOrElse("false").toBoolean
 
     val allApps = parent.getApplicationList()
-      .filter(_.attempts.head.completed != requestedIncomplete)
+      .filter(_.completed != requestedIncomplete)
     val allAppsSize = allApps.size
 
     val providerConfig = parent.getProviderConfig()

http://git-wip-us.apache.org/repos/asf/spark/blob/a2c7dcf6/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala b/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala
index 1f13d7d..076bdc5 100644
--- a/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala
@@ -23,7 +23,6 @@ import javax.servlet.http.{HttpServlet, HttpServletRequest, HttpServletResponse}
 
 import scala.util.control.NonFatal
 
-import com.google.common.cache._
 import org.eclipse.jetty.servlet.{ServletContextHandler, ServletHolder}
 
 import org.apache.spark.{Logging, SecurityManager, SparkConf}
@@ -31,7 +30,7 @@ import org.apache.spark.deploy.SparkHadoopUtil
 import org.apache.spark.status.api.v1.{ApiRootResource, ApplicationInfo, ApplicationsListResource, UIRoot}
 import org.apache.spark.ui.{SparkUI, UIUtils, WebUI}
 import org.apache.spark.ui.JettyUtils._
-import org.apache.spark.util.{ShutdownHookManager, Utils}
+import org.apache.spark.util.{ShutdownHookManager, SystemClock, Utils}
 
 /**
  * A web server that renders SparkUIs of completed applications.
@@ -50,31 +49,16 @@ class HistoryServer(
     securityManager: SecurityManager,
     port: Int)
   extends WebUI(securityManager, securityManager.getSSLOptions("historyServer"), port, conf)
-  with Logging with UIRoot {
+  with Logging with UIRoot with ApplicationCacheOperations {
 
   // How many applications to retain
   private val retainedApplications = conf.getInt("spark.history.retainedApplications", 50)
 
-  private val appLoader = new CacheLoader[String, SparkUI] {
-    override def load(key: String): SparkUI = {
-      val parts = key.split("/")
-      require(parts.length == 1 || parts.length == 2, s"Invalid app key $key")
-      val ui = provider
-        .getAppUI(parts(0), if (parts.length > 1) Some(parts(1)) else None)
-        .getOrElse(throw new NoSuchElementException(s"no app with key $key"))
-      attachSparkUI(ui)
-      ui
-    }
-  }
+  // application
+  private val appCache = new ApplicationCache(this, retainedApplications, new SystemClock())
 
-  private val appCache = CacheBuilder.newBuilder()
-    .maximumSize(retainedApplications)
-    .removalListener(new RemovalListener[String, SparkUI] {
-      override def onRemoval(rm: RemovalNotification[String, SparkUI]): Unit = {
-        detachSparkUI(rm.getValue())
-      }
-    })
-    .build(appLoader)
+  // and its metrics, for testing as well as monitoring
+  val cacheMetrics = appCache.metrics
 
   private val loaderServlet = new HttpServlet {
     protected override def doGet(req: HttpServletRequest, res: HttpServletResponse): Unit = {
@@ -117,17 +101,7 @@ class HistoryServer(
   }
 
   def getSparkUI(appKey: String): Option[SparkUI] = {
-    try {
-      val ui = appCache.get(appKey)
-      Some(ui)
-    } catch {
-      case NonFatal(e) => e.getCause() match {
-        case nsee: NoSuchElementException =>
-          None
-
-        case cause: Exception => throw cause
-      }
-    }
+    appCache.getSparkUI(appKey)
   }
 
   initialize()
@@ -160,22 +134,37 @@ class HistoryServer(
   override def stop() {
     super.stop()
     provider.stop()
+    appCache.stop()
   }
 
   /** Attach a reconstructed UI to this server. Only valid after bind(). */
-  private def attachSparkUI(ui: SparkUI) {
+  override def attachSparkUI(
+      appId: String,
+      attemptId: Option[String],
+      ui: SparkUI,
+      completed: Boolean) {
     assert(serverInfo.isDefined, "HistoryServer must be bound before attaching SparkUIs")
     ui.getHandlers.foreach(attachHandler)
     addFilters(ui.getHandlers, conf)
   }
 
   /** Detach a reconstructed UI from this server. Only valid after bind(). */
-  private def detachSparkUI(ui: SparkUI) {
+  override def detachSparkUI(appId: String, attemptId: Option[String], ui: SparkUI): Unit = {
     assert(serverInfo.isDefined, "HistoryServer must be bound before detaching SparkUIs")
     ui.getHandlers.foreach(detachHandler)
   }
 
   /**
+   * Get the application UI and whether or not it is completed
+   * @param appId application ID
+   * @param attemptId attempt ID
+   * @return If found, the Spark UI and any history information to be used in the cache
+   */
+  override def getAppUI(appId: String, attemptId: Option[String]): Option[LoadedAppUI] = {
+    provider.getAppUI(appId, attemptId)
+  }
+
+  /**
    * Returns a list of available applications, in descending order according to their end time.
    *
    * @return List of all known applications.
@@ -202,9 +191,15 @@ class HistoryServer(
    */
   def getProviderConfig(): Map[String, String] = provider.getConfig()
 
+  /**
+   * Load an application UI and attach it to the web server.
+   * @param appId application ID
+   * @param attemptId optional attempt ID
+   * @return true if the application was found and loaded.
+   */
   private def loadAppUi(appId: String, attemptId: Option[String]): Boolean = {
     try {
-      appCache.get(appId + attemptId.map { id => s"/$id" }.getOrElse(""))
+      appCache.get(appId, attemptId)
       true
     } catch {
       case NonFatal(e) => e.getCause() match {
@@ -216,6 +211,17 @@ class HistoryServer(
     }
   }
 
+  /**
+   * String value for diagnostics.
+   * @return a multi-line description of the server state.
+   */
+  override def toString: String = {
+    s"""
+      | History Server;
+      | provider = $provider
+      | cache = $appCache
+    """.stripMargin
+  }
 }
 
 /**

http://git-wip-us.apache.org/repos/asf/spark/blob/a2c7dcf6/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala b/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala
index 01fee46..8354e2a 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala
@@ -224,6 +224,13 @@ private[spark] class EventLoggingListener(
       }
     }
     fileSystem.rename(new Path(logPath + IN_PROGRESS), target)
+    // touch file to ensure modtime is current across those filesystems where rename()
+    // does not set it, -and which support setTimes(); it's a no-op on most object stores
+    try {
+      fileSystem.setTimes(target, System.currentTimeMillis(), -1)
+    } catch {
+      case e: Exception => logDebug(s"failed to set time of $target", e)
+    }
   }
 
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/a2c7dcf6/core/src/test/scala/org/apache/spark/deploy/history/ApplicationCacheSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/deploy/history/ApplicationCacheSuite.scala b/core/src/test/scala/org/apache/spark/deploy/history/ApplicationCacheSuite.scala
new file mode 100644
index 0000000..de6680c
--- /dev/null
+++ b/core/src/test/scala/org/apache/spark/deploy/history/ApplicationCacheSuite.scala
@@ -0,0 +1,488 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.history
+
+import java.util.{Date, NoSuchElementException}
+import javax.servlet.Filter
+import javax.servlet.http.{HttpServletRequest, HttpServletResponse}
+
+import scala.collection.mutable
+import scala.collection.mutable.ListBuffer
+import scala.language.postfixOps
+
+import com.codahale.metrics.Counter
+import com.google.common.cache.LoadingCache
+import com.google.common.util.concurrent.UncheckedExecutionException
+import org.eclipse.jetty.servlet.ServletContextHandler
+import org.mockito.Matchers._
+import org.mockito.Mockito._
+import org.mockito.invocation.InvocationOnMock
+import org.mockito.stubbing.Answer
+import org.scalatest.Matchers
+import org.scalatest.mock.MockitoSugar
+
+import org.apache.spark.{Logging, SparkFunSuite}
+import org.apache.spark.status.api.v1.{ApplicationAttemptInfo => AttemptInfo, ApplicationInfo}
+import org.apache.spark.ui.SparkUI
+import org.apache.spark.util.{Clock, ManualClock, Utils}
+
+class ApplicationCacheSuite extends SparkFunSuite with Logging with MockitoSugar with Matchers {
+
+  /**
+   * subclass with access to the cache internals
+   * @param retainedApplications number of retained applications
+   */
+  class TestApplicationCache(
+      operations: ApplicationCacheOperations = new StubCacheOperations(),
+      retainedApplications: Int,
+      clock: Clock = new ManualClock(0))
+      extends ApplicationCache(operations, retainedApplications, clock) {
+
+    def cache(): LoadingCache[CacheKey, CacheEntry] = appCache
+  }
+
+  /**
+   * Stub cache operations.
+   * The state is kept in a map of [[CacheKey]] to [[CacheEntry]],
+   * the `probeTime` field in the cache entry setting the timestamp of the entry
+   */
+  class StubCacheOperations extends ApplicationCacheOperations with Logging {
+
+    /** map to UI instances, including timestamps, which are used in update probes */
+    val instances = mutable.HashMap.empty[CacheKey, CacheEntry]
+
+    /** Map of attached spark UIs */
+    val attached = mutable.HashMap.empty[CacheKey, SparkUI]
+
+    var getAppUICount = 0L
+    var attachCount = 0L
+    var detachCount = 0L
+    var updateProbeCount = 0L
+
+    override def getAppUI(appId: String, attemptId: Option[String]): Option[LoadedAppUI] = {
+      logDebug(s"getAppUI($appId, $attemptId)")
+      getAppUICount += 1
+      instances.get(CacheKey(appId, attemptId)).map( e =>
+        LoadedAppUI(e.ui, updateProbe(appId, attemptId, e.probeTime)))
+    }
+
+    override def attachSparkUI(
+        appId: String,
+        attemptId: Option[String],
+        ui: SparkUI,
+        completed: Boolean): Unit = {
+      logDebug(s"attachSparkUI($appId, $attemptId, $ui)")
+      attachCount += 1
+      attached += (CacheKey(appId, attemptId) -> ui)
+    }
+
+    def putAndAttach(
+        appId: String,
+        attemptId: Option[String],
+        completed: Boolean,
+        started: Long,
+        ended: Long,
+        timestamp: Long): SparkUI = {
+      val ui = putAppUI(appId, attemptId, completed, started, ended, timestamp)
+      attachSparkUI(appId, attemptId, ui, completed)
+      ui
+    }
+
+    def putAppUI(
+        appId: String,
+        attemptId: Option[String],
+        completed: Boolean,
+        started: Long,
+        ended: Long,
+        timestamp: Long): SparkUI = {
+      val ui = newUI(appId, attemptId, completed, started, ended)
+      putInstance(appId, attemptId, ui, completed, timestamp)
+      ui
+    }
+
+    def putInstance(
+        appId: String,
+        attemptId: Option[String],
+        ui: SparkUI,
+        completed: Boolean,
+        timestamp: Long): Unit = {
+      instances += (CacheKey(appId, attemptId) ->
+          new CacheEntry(ui, completed, updateProbe(appId, attemptId, timestamp), timestamp))
+    }
+
+    /**
+     * Detach a reconstructed UI
+     *
+     * @param ui Spark UI
+     */
+    override def detachSparkUI(appId: String, attemptId: Option[String], ui: SparkUI): Unit = {
+      logDebug(s"detachSparkUI($appId, $attemptId, $ui)")
+      detachCount += 1
+      var name = ui.getAppName
+      val key = CacheKey(appId, attemptId)
+      attached.getOrElse(key, { throw new java.util.NoSuchElementException() })
+      attached -= key
+    }
+
+    /**
+     * Lookup from the internal cache of attached UIs
+     */
+    def getAttached(appId: String, attemptId: Option[String]): Option[SparkUI] = {
+      attached.get(CacheKey(appId, attemptId))
+    }
+
+    /**
+     * The update probe.
+     * @param appId application to probe
+     * @param attemptId attempt to probe
+     * @param updateTime timestamp of this UI load
+     */
+    private[history] def updateProbe(
+        appId: String,
+        attemptId: Option[String],
+        updateTime: Long)(): Boolean = {
+      updateProbeCount += 1
+      logDebug(s"isUpdated($appId, $attemptId, ${updateTime})")
+      val entry = instances.get(CacheKey(appId, attemptId)).get
+      val updated = entry.probeTime > updateTime
+      logDebug(s"entry = $entry; updated = $updated")
+      updated
+    }
+  }
+
+  /**
+   * Create a new UI. The info/attempt info classes here are from the package
+   * `org.apache.spark.status.api.v1`, not the near-equivalents from the history package
+   */
+  def newUI(
+      name: String,
+      attemptId: Option[String],
+      completed: Boolean,
+      started: Long,
+      ended: Long): SparkUI = {
+    val info = new ApplicationInfo(name, name, Some(1), Some(1), Some(1), Some(64),
+      Seq(new AttemptInfo(attemptId, new Date(started), new Date(ended),
+        new Date(ended), ended - started, "user", completed)))
+    val ui = mock[SparkUI]
+    when(ui.getApplicationInfoList).thenReturn(List(info).iterator)
+    when(ui.getAppName).thenReturn(name)
+    when(ui.appName).thenReturn(name)
+    val handler = new ServletContextHandler()
+    when(ui.getHandlers).thenReturn(Seq(handler))
+    ui
+  }
+
+  /**
+   * Test operations on completed UIs: they are loaded on demand, entries
+   * are removed on overload.
+   *
+   * This effectively tests the original behavior of the history server's cache.
+   */
+  test("Completed UI get") {
+    val operations = new StubCacheOperations()
+    val clock = new ManualClock(1)
+    implicit val cache = new ApplicationCache(operations, 2, clock)
+    val metrics = cache.metrics
+    // cache misses
+    val app1 = "app-1"
+    assertNotFound(app1, None)
+    assertMetric("lookupCount", metrics.lookupCount, 1)
+    assertMetric("lookupFailureCount", metrics.lookupFailureCount, 1)
+    assert(1 === operations.getAppUICount, "getAppUICount")
+    assertNotFound(app1, None)
+    assert(2 === operations.getAppUICount, "getAppUICount")
+    assert(0 === operations.attachCount, "attachCount")
+
+    val now = clock.getTimeMillis()
+    // add the entry
+    operations.putAppUI(app1, None, true, now, now, now)
+
+    // make sure its local
+    operations.getAppUI(app1, None).get
+    operations.getAppUICount = 0
+    // now expect it to be found
+    val cacheEntry = cache.lookupCacheEntry(app1, None)
+    assert(1 === cacheEntry.probeTime)
+    assert(cacheEntry.completed)
+    // assert about queries made of the opereations
+    assert(1 === operations.getAppUICount, "getAppUICount")
+    assert(1 === operations.attachCount, "attachCount")
+
+    // and in the map of attached
+    assert(operations.getAttached(app1, None).isDefined, s"attached entry '1' from $cache")
+
+    // go forward in time
+    clock.setTime(10)
+    val time2 = clock.getTimeMillis()
+    val cacheEntry2 = cache.get(app1)
+    // no more refresh as this is a completed app
+    assert(1 === operations.getAppUICount, "getAppUICount")
+    assert(0 === operations.updateProbeCount, "updateProbeCount")
+    assert(0 === operations.detachCount, "attachCount")
+
+    // evict the entry
+    operations.putAndAttach("2", None, true, time2, time2, time2)
+    operations.putAndAttach("3", None, true, time2, time2, time2)
+    cache.get("2")
+    cache.get("3")
+
+    // there should have been a detachment here
+    assert(1 === operations.detachCount, s"detach count from $cache")
+    // and entry app1 no longer attached
+    assert(operations.getAttached(app1, None).isEmpty, s"get($app1) in $cache")
+    val appId = "app1"
+    val attemptId = Some("_01")
+    val time3 = clock.getTimeMillis()
+    operations.putAppUI(appId, attemptId, false, time3, 0, time3)
+    // expect an error here
+    assertNotFound(appId, None)
+  }
+
+  test("Test that if an attempt ID is is set, it must be used in lookups") {
+    val operations = new StubCacheOperations()
+    val clock = new ManualClock(1)
+    implicit val cache = new ApplicationCache(operations, retainedApplications = 10, clock = clock)
+    val appId = "app1"
+    val attemptId = Some("_01")
+    operations.putAppUI(appId, attemptId, false, clock.getTimeMillis(), 0, 0)
+    assertNotFound(appId, None)
+  }
+
+  /**
+   * Test that incomplete apps are not probed for updates during the time window,
+   * but that they are checked if that window has expired and they are not completed.
+   * Then, if they have changed, the old entry is replaced by a new one.
+   */
+  test("Incomplete apps refreshed") {
+    val operations = new StubCacheOperations()
+    val clock = new ManualClock(50)
+    val window = 500
+    implicit val cache = new ApplicationCache(operations, retainedApplications = 5, clock = clock)
+    val metrics = cache.metrics
+    // add the incomplete app
+    // add the entry
+    val started = clock.getTimeMillis()
+    val appId = "app1"
+    val attemptId = Some("001")
+    operations.putAppUI(appId, attemptId, false, started, 0, started)
+    val firstEntry = cache.lookupCacheEntry(appId, attemptId)
+    assert(started === firstEntry.probeTime, s"timestamp in $firstEntry")
+    assert(!firstEntry.completed, s"entry is complete: $firstEntry")
+    assertMetric("lookupCount", metrics.lookupCount, 1)
+
+    assert(0 === operations.updateProbeCount, "expected no update probe on that first get")
+
+    val checkTime = window * 2
+    clock.setTime(checkTime)
+    val entry3 = cache.lookupCacheEntry(appId, attemptId)
+    assert(firstEntry !== entry3, s"updated entry test from $cache")
+    assertMetric("lookupCount", metrics.lookupCount, 2)
+    assertMetric("updateProbeCount", metrics.updateProbeCount, 1)
+    assertMetric("updateTriggeredCount", metrics.updateTriggeredCount, 0)
+    assert(1 === operations.updateProbeCount, s"refresh count in $cache")
+    assert(0 === operations.detachCount, s"detach count")
+    assert(entry3.probeTime === checkTime)
+
+    val updateTime = window * 3
+    // update the cached value
+    val updatedApp = operations.putAppUI(appId, attemptId, true, started, updateTime, updateTime)
+    val endTime = window * 10
+    clock.setTime(endTime)
+    logDebug(s"Before operation = $cache")
+    val entry5 = cache.lookupCacheEntry(appId, attemptId)
+    assertMetric("lookupCount", metrics.lookupCount, 3)
+    assertMetric("updateProbeCount", metrics.updateProbeCount, 2)
+    // the update was triggered
+    assertMetric("updateTriggeredCount", metrics.updateTriggeredCount, 1)
+    assert(updatedApp === entry5.ui, s"UI {$updatedApp} did not match entry {$entry5} in $cache")
+
+    // at which point, the refreshes stop
+    clock.setTime(window * 20)
+    assertCacheEntryEquals(appId, attemptId, entry5)
+    assertMetric("updateProbeCount", metrics.updateProbeCount, 2)
+  }
+
+  /**
+   * Assert that a metric counter has a specific value; failure raises an exception
+   * including the cache's toString value
+   * @param name counter name (for exceptions)
+   * @param counter counter
+   * @param expected expected value.
+   * @param cache cache
+   */
+  def assertMetric(
+      name: String,
+      counter: Counter,
+      expected: Long)
+      (implicit cache: ApplicationCache): Unit = {
+    val actual = counter.getCount
+    if (actual != expected) {
+      // this is here because Scalatest loses stack depth
+      throw new Exception(s"Wrong $name value - expected $expected but got $actual in $cache")
+    }
+  }
+
+  /**
+   * Look up the cache entry and assert that it maches in the expected value.
+   * This assertion works if the two CacheEntries are different -it looks at the fields.
+   * UI are compared on object equality; the timestamp and completed flags directly.
+   * @param appId application ID
+   * @param attemptId attempt ID
+   * @param expected expected value
+   * @param cache app cache
+   */
+  def assertCacheEntryEquals(
+      appId: String,
+      attemptId: Option[String],
+      expected: CacheEntry)
+      (implicit cache: ApplicationCache): Unit = {
+    val actual = cache.lookupCacheEntry(appId, attemptId)
+    val errorText = s"Expected get($appId, $attemptId) -> $expected, but got $actual from $cache"
+    assert(expected.ui === actual.ui, errorText + " SparkUI reference")
+    assert(expected.completed === actual.completed, errorText + " -completed flag")
+    assert(expected.probeTime === actual.probeTime, errorText + " -timestamp")
+  }
+
+  /**
+   * Assert that a key wasn't found in cache or loaded.
+   *
+   * Looks for the specific nested exception raised by [[ApplicationCache]]
+   * @param appId application ID
+   * @param attemptId attempt ID
+   * @param cache app cache
+   */
+  def assertNotFound(
+      appId: String,
+      attemptId: Option[String])
+      (implicit cache: ApplicationCache): Unit = {
+    val ex = intercept[UncheckedExecutionException] {
+      cache.get(appId, attemptId)
+    }
+    var cause = ex.getCause
+    assert(cause !== null)
+    if (!cause.isInstanceOf[NoSuchElementException]) {
+      throw cause
+    }
+  }
+
+  test("Large Scale Application Eviction") {
+    val operations = new StubCacheOperations()
+    val clock = new ManualClock(0)
+    val size = 5
+    // only two entries are retained, so we expect evictions to occurr on lookups
+    implicit val cache: ApplicationCache = new TestApplicationCache(operations,
+      retainedApplications = size, clock = clock)
+
+    val attempt1 = Some("01")
+
+    val ids = new ListBuffer[String]()
+    // build a list of applications
+    val count = 100
+    for (i <- 1 to count ) {
+      val appId = f"app-$i%04d"
+      ids += appId
+      clock.advance(10)
+      val t = clock.getTimeMillis()
+      operations.putAppUI(appId, attempt1, true, t, t, t)
+    }
+    // now go through them in sequence reading them, expect evictions
+    ids.foreach { id =>
+      cache.get(id, attempt1)
+    }
+    logInfo(cache.toString)
+    val metrics = cache.metrics
+
+    assertMetric("loadCount", metrics.loadCount, count)
+    assertMetric("evictionCount", metrics.evictionCount, count - size)
+}
+
+  test("Attempts are Evicted") {
+    val operations = new StubCacheOperations()
+    implicit val cache: ApplicationCache = new TestApplicationCache(operations,
+      retainedApplications = 4)
+    val metrics = cache.metrics
+    val appId = "app1"
+    val attempt1 = Some("01")
+    val attempt2 = Some("02")
+    val attempt3 = Some("03")
+    operations.putAppUI(appId, attempt1, true, 100, 110, 110)
+    operations.putAppUI(appId, attempt2, true, 200, 210, 210)
+    operations.putAppUI(appId, attempt3, true, 300, 310, 310)
+    val attempt4 = Some("04")
+    operations.putAppUI(appId, attempt4, true, 400, 410, 410)
+    val attempt5 = Some("05")
+    operations.putAppUI(appId, attempt5, true, 500, 510, 510)
+
+    def expectLoadAndEvictionCounts(expectedLoad: Int, expectedEvictionCount: Int): Unit = {
+      assertMetric("loadCount", metrics.loadCount, expectedLoad)
+      assertMetric("evictionCount", metrics.evictionCount, expectedEvictionCount)
+    }
+
+    // first entry
+    cache.get(appId, attempt1)
+    expectLoadAndEvictionCounts(1, 0)
+
+    // second
+    cache.get(appId, attempt2)
+    expectLoadAndEvictionCounts(2, 0)
+
+    // no change
+    cache.get(appId, attempt2)
+    expectLoadAndEvictionCounts(2, 0)
+
+    // eviction time
+    cache.get(appId, attempt3)
+    cache.size() should be(3)
+    cache.get(appId, attempt4)
+    expectLoadAndEvictionCounts(4, 0)
+    cache.get(appId, attempt5)
+    expectLoadAndEvictionCounts(5, 1)
+    cache.get(appId, attempt5)
+    expectLoadAndEvictionCounts(5, 1)
+
+  }
+
+  test("Instantiate Filter") {
+    // this is a regression test on the filter being constructable
+    val clazz = Utils.classForName(ApplicationCacheCheckFilterRelay.FILTER_NAME)
+    val instance = clazz.newInstance()
+    instance shouldBe a [Filter]
+  }
+
+  test("redirect includes query params") {
+    val clazz = Utils.classForName(ApplicationCacheCheckFilterRelay.FILTER_NAME)
+    val filter = clazz.newInstance().asInstanceOf[ApplicationCacheCheckFilter]
+    filter.appId = "local-123"
+    val cache = mock[ApplicationCache]
+    when(cache.checkForUpdates(any(), any())).thenReturn(true)
+    ApplicationCacheCheckFilterRelay.setApplicationCache(cache)
+    val request = mock[HttpServletRequest]
+    when(request.getMethod()).thenReturn("GET")
+    when(request.getRequestURI()).thenReturn("http://localhost:18080/history/local-123/jobs/job/")
+    when(request.getQueryString()).thenReturn("id=2")
+    val resp = mock[HttpServletResponse]
+    when(resp.encodeRedirectURL(any())).thenAnswer(new Answer[String](){
+      override def answer(invocationOnMock: InvocationOnMock): String = {
+        invocationOnMock.getArguments()(0).asInstanceOf[String]
+      }
+    })
+    filter.doFilter(request, resp, null)
+    verify(resp).sendRedirect("http://localhost:18080/history/local-123/jobs/job/?id=2")
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/a2c7dcf6/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerSuite.scala b/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerSuite.scala
index 40d0076..4b05469 100644
--- a/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerSuite.scala
@@ -21,16 +21,28 @@ import java.net.{HttpURLConnection, URL}
 import java.util.zip.ZipInputStream
 import javax.servlet.http.{HttpServletRequest, HttpServletResponse}
 
+import scala.concurrent.duration._
+import scala.language.postfixOps
+
+import com.codahale.metrics.Counter
 import com.google.common.base.Charsets
 import com.google.common.io.{ByteStreams, Files}
 import org.apache.commons.io.{FileUtils, IOUtils}
-import org.mockito.Mockito.when
+import org.apache.hadoop.fs.{FileStatus, FileSystem, Path}
+import org.json4s.JsonAST._
+import org.json4s.jackson.JsonMethods
+import org.json4s.jackson.JsonMethods._
+import org.openqa.selenium.WebDriver
+import org.openqa.selenium.htmlunit.HtmlUnitDriver
 import org.scalatest.{BeforeAndAfter, Matchers}
+import org.scalatest.concurrent.Eventually
 import org.scalatest.mock.MockitoSugar
+import org.scalatest.selenium.WebBrowser
 
-import org.apache.spark.{JsonTestUtils, SecurityManager, SparkConf, SparkFunSuite}
-import org.apache.spark.ui.{SparkUI, UIUtils}
-import org.apache.spark.util.ResetSystemProperties
+import org.apache.spark._
+import org.apache.spark.ui.SparkUI
+import org.apache.spark.ui.jobs.UIData.JobUIData
+import org.apache.spark.util.{ResetSystemProperties, Utils}
 
 /**
  * A collection of tests against the historyserver, including comparing responses from the json
@@ -44,7 +56,8 @@ import org.apache.spark.util.ResetSystemProperties
  * are considered part of Spark's public api.
  */
 class HistoryServerSuite extends SparkFunSuite with BeforeAndAfter with Matchers with MockitoSugar
-  with JsonTestUtils with ResetSystemProperties {
+  with JsonTestUtils with Eventually with WebBrowser with LocalSparkContext
+  with ResetSystemProperties {
 
   private val logDir = new File("src/test/resources/spark-events")
   private val expRoot = new File("src/test/resources/HistoryServerExpectations/")
@@ -56,7 +69,7 @@ class HistoryServerSuite extends SparkFunSuite with BeforeAndAfter with Matchers
   def init(): Unit = {
     val conf = new SparkConf()
       .set("spark.history.fs.logDirectory", logDir.getAbsolutePath)
-      .set("spark.history.fs.updateInterval", "0")
+      .set("spark.history.fs.update.interval", "0")
       .set("spark.testing", "true")
     provider = new FsHistoryProvider(conf)
     provider.checkForLogs()
@@ -256,6 +269,204 @@ class HistoryServerSuite extends SparkFunSuite with BeforeAndAfter with Matchers
     all (siteRelativeLinks) should startWith (uiRoot)
   }
 
+  test("incomplete apps get refreshed") {
+
+    implicit val webDriver: WebDriver = new HtmlUnitDriver
+    implicit val formats = org.json4s.DefaultFormats
+
+    // this test dir is explictly deleted on successful runs; retained for diagnostics when
+    // not
+    val logDir = Utils.createDirectory(System.getProperty("java.io.tmpdir", "logs"))
+
+    // a new conf is used with the background thread set and running at its fastest
+    // alllowed refresh rate (1Hz)
+    val myConf = new SparkConf()
+      .set("spark.history.fs.logDirectory", logDir.getAbsolutePath)
+      .set("spark.eventLog.dir", logDir.getAbsolutePath)
+      .set("spark.history.fs.update.interval", "1s")
+      .set("spark.eventLog.enabled", "true")
+      .set("spark.history.cache.window", "250ms")
+      .remove("spark.testing")
+    val provider = new FsHistoryProvider(myConf)
+    val securityManager = new SecurityManager(myConf)
+
+    sc = new SparkContext("local", "test", myConf)
+    val logDirUri = logDir.toURI
+    val logDirPath = new Path(logDirUri)
+    val fs = FileSystem.get(logDirUri, sc.hadoopConfiguration)
+
+    def listDir(dir: Path): Seq[FileStatus] = {
+      val statuses = fs.listStatus(dir)
+      statuses.flatMap(
+        stat => if (stat.isDirectory) listDir(stat.getPath) else Seq(stat))
+    }
+
+    def dumpLogDir(msg: String = ""): Unit = {
+      if (log.isDebugEnabled) {
+        logDebug(msg)
+        listDir(logDirPath).foreach { status =>
+          val s = status.toString
+          logDebug(s)
+        }
+      }
+    }
+
+    // stop the server with the old config, and start the new one
+    server.stop()
+    server = new HistoryServer(myConf, provider, securityManager, 18080)
+    server.initialize()
+    server.bind()
+    val port = server.boundPort
+    val metrics = server.cacheMetrics
+
+    // assert that a metric has a value; if not dump the whole metrics instance
+    def assertMetric(name: String, counter: Counter, expected: Long): Unit = {
+      val actual = counter.getCount
+      if (actual != expected) {
+        // this is here because Scalatest loses stack depth
+        fail(s"Wrong $name value - expected $expected but got $actual" +
+            s" in metrics\n$metrics")
+      }
+    }
+
+    // build a URL for an app or app/attempt plus a page underneath
+    def buildURL(appId: String, suffix: String): URL = {
+      new URL(s"http://localhost:$port/history/$appId$suffix")
+    }
+
+    // build a rest URL for the application and suffix.
+    def applications(appId: String, suffix: String): URL = {
+      new URL(s"http://localhost:$port/api/v1/applications/$appId$suffix")
+    }
+
+    val historyServerRoot = new URL(s"http://localhost:$port/")
+
+    // start initial job
+    val d = sc.parallelize(1 to 10)
+    d.count()
+    val stdInterval = interval(100 milliseconds)
+    val appId = eventually(timeout(20 seconds), stdInterval) {
+      val json = getContentAndCode("applications", port)._2.get
+      val apps = parse(json).asInstanceOf[JArray].arr
+      apps should have size 1
+      (apps.head \ "id").extract[String]
+    }
+
+    val appIdRoot = buildURL(appId, "")
+    val rootAppPage = HistoryServerSuite.getUrl(appIdRoot)
+    logDebug(s"$appIdRoot ->[${rootAppPage.length}] \n$rootAppPage")
+    // sanity check to make sure filter is chaining calls
+    rootAppPage should not be empty
+
+    def getAppUI: SparkUI = {
+      provider.getAppUI(appId, None).get.ui
+    }
+
+    // selenium isn't that useful on failures...add our own reporting
+    def getNumJobs(suffix: String): Int = {
+      val target = buildURL(appId, suffix)
+      val targetBody = HistoryServerSuite.getUrl(target)
+      try {
+        go to target.toExternalForm
+        findAll(cssSelector("tbody tr")).toIndexedSeq.size
+      } catch {
+        case ex: Exception =>
+          throw new Exception(s"Against $target\n$targetBody", ex)
+      }
+    }
+    // use REST API to get #of jobs
+    def getNumJobsRestful(): Int = {
+      val json = HistoryServerSuite.getUrl(applications(appId, "/jobs"))
+      val jsonAst = parse(json)
+      val jobList = jsonAst.asInstanceOf[JArray]
+      jobList.values.size
+    }
+
+    // get a list of app Ids of all apps in a given state. REST API
+    def listApplications(completed: Boolean): Seq[String] = {
+      val json = parse(HistoryServerSuite.getUrl(applications("", "")))
+      logDebug(s"${JsonMethods.pretty(json)}")
+      json match {
+        case JNothing => Seq()
+        case apps: JArray =>
+          apps.filter(app => {
+            (app \ "attempts") match {
+              case attempts: JArray =>
+                val state = (attempts.children.head \ "completed").asInstanceOf[JBool]
+                state.value == completed
+              case _ => false
+            }
+          }).map(app => (app \ "id").asInstanceOf[JString].values)
+        case _ => Seq()
+      }
+    }
+
+    def completedJobs(): Seq[JobUIData] = {
+      getAppUI.jobProgressListener.completedJobs
+    }
+
+    def activeJobs(): Seq[JobUIData] = {
+      getAppUI.jobProgressListener.activeJobs.values.toSeq
+    }
+
+    activeJobs() should have size 0
+    completedJobs() should have size 1
+    getNumJobs("") should be (1)
+    getNumJobs("/jobs") should be (1)
+    getNumJobsRestful() should be (1)
+    assert(metrics.lookupCount.getCount > 1, s"lookup count too low in $metrics")
+
+    // dump state before the next bit of test, which is where update
+    // checking really gets stressed
+    dumpLogDir("filesystem before executing second job")
+    logDebug(s"History Server: $server")
+
+    val d2 = sc.parallelize(1 to 10)
+    d2.count()
+    dumpLogDir("After second job")
+
+    val stdTimeout = timeout(10 seconds)
+    logDebug("waiting for UI to update")
+    eventually(stdTimeout, stdInterval) {
+      assert(2 === getNumJobs(""),
+        s"jobs not updated, server=$server\n dir = ${listDir(logDirPath)}")
+      assert(2 === getNumJobs("/jobs"),
+        s"job count under /jobs not updated, server=$server\n dir = ${listDir(logDirPath)}")
+      getNumJobsRestful() should be(2)
+    }
+
+    d.count()
+    d.count()
+    eventually(stdTimeout, stdInterval) {
+      assert(4 === getNumJobsRestful(), s"two jobs back-to-back not updated, server=$server\n")
+    }
+    val jobcount = getNumJobs("/jobs")
+    assert(!provider.getListing().head.completed)
+
+    listApplications(false) should contain(appId)
+
+    // stop the spark context
+    resetSparkContext()
+    // check the app is now found as completed
+    eventually(stdTimeout, stdInterval) {
+      assert(provider.getListing().head.completed,
+        s"application never completed, server=$server\n")
+    }
+
+    // app becomes observably complete
+    eventually(stdTimeout, stdInterval) {
+      listApplications(true) should contain (appId)
+    }
+    // app is no longer incomplete
+    listApplications(false) should not contain(appId)
+
+    assert(jobcount === getNumJobs("/jobs"))
+
+    // no need to retain the test dir now the tests complete
+    logDir.deleteOnExit();
+
+  }
+
   def getContentAndCode(path: String, port: Int = port): (Int, Option[String], Option[String]) = {
     HistoryServerSuite.getContentAndCode(new URL(s"http://localhost:$port/api/v1/$path"))
   }
@@ -275,6 +486,7 @@ class HistoryServerSuite extends SparkFunSuite with BeforeAndAfter with Matchers
     out.write(json)
     out.close()
   }
+
 }
 
 object HistoryServerSuite {

http://git-wip-us.apache.org/repos/asf/spark/blob/a2c7dcf6/docs/monitoring.md
----------------------------------------------------------------------
diff --git a/docs/monitoring.md b/docs/monitoring.md
index cedceb2..c37f6fb 100644
--- a/docs/monitoring.md
+++ b/docs/monitoring.md
@@ -38,11 +38,25 @@ You can start the history server by executing:
 
     ./sbin/start-history-server.sh
 
-When using the file-system provider class (see spark.history.provider below), the base logging
-directory must be supplied in the <code>spark.history.fs.logDirectory</code> configuration option,
-and should contain sub-directories that each represents an application's event logs. This creates a
-web interface at `http://<server-url>:18080` by default. The history server can be configured as
-follows:
+This creates a web interface at `http://<server-url>:18080` by default, listing incomplete
+and completed applications and attempts, and allowing them to be viewed
+
+When using the file-system provider class (see `spark.history.provider` below), the base logging
+directory must be supplied in the `spark.history.fs.logDirectory` configuration option,
+and should contain sub-directories that each represents an application's event logs.
+ 
+The spark jobs themselves must be configured to log events, and to log them to the same shared,
+writeable directory. For example, if the server was configured with a log directory of
+`hdfs://namenode/shared/spark-logs`, then the client-side options would be:
+
+```
+spark.eventLog.enabled true
+spark.eventLog.dir hdfs://namenode/shared/spark-logs
+```
+ 
+The history server can be configured as follows:
+
+### Environment Variables
 
 <table class="table">
   <tr><th style="width:21%">Environment Variable</th><th>Meaning</th></tr>
@@ -69,11 +83,13 @@ follows:
   </tr>
 </table>
 
+### Spark configuration options
+
 <table class="table">
   <tr><th>Property Name</th><th>Default</th><th>Meaning</th></tr>
   <tr>
     <td>spark.history.provider</td>
-    <td>org.apache.spark.deploy.history.FsHistoryProvider</td>
+    <td><code>org.apache.spark.deploy.history.FsHistoryProvider</code></td>
     <td>Name of the class implementing the application history backend. Currently there is only
     one implementation, provided by Spark, which looks for application logs stored in the
     file system.</td>
@@ -82,15 +98,21 @@ follows:
     <td>spark.history.fs.logDirectory</td>
     <td>file:/tmp/spark-events</td>
     <td>
-     Directory that contains application event logs to be loaded by the history server
+    For the filesystem history provider, the URL to the directory containing application event
+    logs to load. This can be a local <code>file://</code> path,
+    an HDFS path <code>hdfs://namenode/shared/spark-logs</code>
+    or that of an alternative filesystem supported by the Hadoop APIs.
     </td>
   </tr>
   <tr>
     <td>spark.history.fs.update.interval</td>
     <td>10s</td>
     <td>
-      The period at which information displayed by this history server is updated.
-      Each update checks for any changes made to the event logs in persisted storage.
+      The period at which the the filesystem history provider checks for new or
+      updated logs in the log directory. A shorter interval detects new applications faster,
+      at the expense of more server load re-reading updated applications.
+      As soon as an update has completed, listings of the completed and incomplete applications
+      will reflect the changes.
     </td>
   </tr>
   <tr>
@@ -112,7 +134,7 @@ follows:
     <td>spark.history.kerberos.enabled</td>
     <td>false</td>
     <td>
-      Indicates whether the history server should use kerberos to login. This is useful
+      Indicates whether the history server should use kerberos to login. This is required
       if the history server is accessing HDFS files on a secure Hadoop cluster. If this is 
       true, it uses the configs <code>spark.history.kerberos.principal</code> and
       <code>spark.history.kerberos.keytab</code>. 
@@ -156,15 +178,15 @@ follows:
     <td>spark.history.fs.cleaner.interval</td>
     <td>1d</td>
     <td>
-      How often the job history cleaner checks for files to delete.
-      Files are only deleted if they are older than spark.history.fs.cleaner.maxAge.
+      How often the filesystem job history cleaner checks for files to delete.
+      Files are only deleted if they are older than <code>spark.history.fs.cleaner.maxAge</code>
     </td>
   </tr>
   <tr>
     <td>spark.history.fs.cleaner.maxAge</td>
     <td>7d</td>
     <td>
-      Job history files older than this will be deleted when the history cleaner runs.
+      Job history files older than this will be deleted when the filesystem history cleaner runs.
     </td>
   </tr>
 </table>
@@ -172,7 +194,25 @@ follows:
 Note that in all of these UIs, the tables are sortable by clicking their headers,
 making it easy to identify slow tasks, data skew, etc.
 
-Note that the history server only displays completed Spark jobs. One way to signal the completion of a Spark job is to stop the Spark Context explicitly (`sc.stop()`), or in Python using the `with SparkContext() as sc:` to handle the Spark Context setup and tear down, and still show the job history on the UI.
+Note
+
+1. The history server displays both completed and incomplete Spark jobs. If an application makes
+multiple attempts after failures, the failed attempts will be displayed, as well as any ongoing
+incomplete attempt or the final successful attempt.
+
+2. Incomplete applications are only updated intermittently. The time between updates is defined
+by the interval between checks for changed files (`spark.history.fs.update.interval`).
+On larger clusters the update interval may be set to large values.
+The way to view a running application is actually to view its own web UI.
+
+3. Applications which exited without registering themselves as completed will be listed
+as incomplete —even though they are no longer running. This can happen if an application
+crashes.
+
+2. One way to signal the completion of a Spark job is to stop the Spark Context
+explicitly (`sc.stop()`), or in Python using the `with SparkContext() as sc:` construct
+to handle the Spark Context setup and tear down.
+
 
 ## REST API
 
@@ -249,7 +289,7 @@ These endpoints have been strongly versioned to make it easier to develop applic
 * New endpoints may be added
 * New fields may be added to existing endpoints
 * New versions of the api may be added in the future at a separate endpoint (eg., `api/v2`).  New versions are *not* required to be backwards compatible.
-* Api versions may be dropped, but only after at least one minor release of co-existing with a new api version
+* Api versions may be dropped, but only after at least one minor release of co-existing with a new api version.
 
 Note that even when examining the UI of a running applications, the `applications/[app-id]` portion is
 still required, though there is only one application available.  Eg. to see the list of jobs for the

http://git-wip-us.apache.org/repos/asf/spark/blob/a2c7dcf6/project/MimaExcludes.scala
----------------------------------------------------------------------
diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala
index 1338947..8611106 100644
--- a/project/MimaExcludes.scala
+++ b/project/MimaExcludes.scala
@@ -233,6 +233,9 @@ object MimaExcludes {
         ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.SparkContext.metadataCleaner"),
         ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.scheduler.cluster.YarnSchedulerBackend$YarnDriverEndpoint"),
         ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.scheduler.cluster.YarnSchedulerBackend$YarnSchedulerEndpoint")
+      ) ++ Seq(
+        // SPARK-7889
+        ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.deploy.history.HistoryServer.org$apache$spark$deploy$history$HistoryServer$@tachSparkUI")
       )
     case v if v.startsWith("1.6") =>
       Seq(


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message