spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From van...@apache.org
Subject spark git commit: [SPARK-18010][CORE] Reduce work performed for building up the application list for the History Server app list UI page
Date Tue, 25 Oct 2016 17:36:19 GMT
Repository: spark
Updated Branches:
  refs/heads/master ac8ff920f -> c5fe3dd4f


[SPARK-18010][CORE] Reduce work performed for building up the application list for the History
Server app list UI page

## What changes were proposed in this pull request?
allow ReplayListenerBus to skip deserialising and replaying certain events using an inexpensive
check of the event log entry. Use this to ensure that when event log replay is triggered for
building the application list, we get the ReplayListenerBus to skip over all but the few events
needed for our immediate purpose. Refer [SPARK-18010] for the motivation behind this change.

## How was this patch tested?

Tested with existing HistoryServer and ReplayListener unit test suites. All tests pass.

Please review https://cwiki.apache.org/confluence/display/SPARK/Contributing+to+Spark before
opening a pull request.

Author: Vinayak <vijoshi5@in.ibm.com>

Closes #15556 from vijoshi/SAAS-467_master.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/c5fe3dd4
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/c5fe3dd4
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/c5fe3dd4

Branch: refs/heads/master
Commit: c5fe3dd4f59c464c830b414acccd3cca0fdd877c
Parents: ac8ff92
Author: Vinayak <vijoshi5@in.ibm.com>
Authored: Tue Oct 25 10:36:03 2016 -0700
Committer: Marcelo Vanzin <vanzin@cloudera.com>
Committed: Tue Oct 25 10:36:03 2016 -0700

----------------------------------------------------------------------
 .../deploy/history/FsHistoryProvider.scala      | 120 +++++++++++--------
 .../spark/scheduler/ReplayListenerBus.scala     |  39 ++++--
 2 files changed, 101 insertions(+), 58 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/c5fe3dd4/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
index 530cc52..dfc1aad 100644
--- a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
@@ -36,6 +36,7 @@ import org.apache.spark.{SecurityManager, SparkConf, SparkException}
 import org.apache.spark.deploy.SparkHadoopUtil
 import org.apache.spark.internal.Logging
 import org.apache.spark.scheduler._
+import org.apache.spark.scheduler.ReplayListenerBus._
 import org.apache.spark.ui.SparkUI
 import org.apache.spark.util.{Clock, SystemClock, ThreadUtils, Utils}
 
@@ -78,10 +79,6 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
 
   import FsHistoryProvider._
 
-  private val NOT_STARTED = "<Not Started>"
-
-  private val SPARK_HISTORY_FS_NUM_REPLAY_THREADS = "spark.history.fs.numReplayThreads"
-
   // Interval between safemode checks.
   private val SAFEMODE_CHECK_INTERVAL_S = conf.getTimeAsSeconds(
     "spark.history.fs.safemodeCheck.interval", "5s")
@@ -241,11 +238,12 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
               HistoryServer.getAttemptURI(appId, attempt.attemptId), attempt.startTime)
             // Do not call ui.bind() to avoid creating a new server for each application
           }
-          val appListener = new ApplicationEventListener()
-          replayBus.addListener(appListener)
-          val appAttemptInfo = replay(fs.getFileStatus(new Path(logDir, attempt.logPath)),
-            replayBus)
-          appAttemptInfo.map { info =>
+
+          val fileStatus = fs.getFileStatus(new Path(logDir, attempt.logPath))
+
+          val appListener = replay(fileStatus, isApplicationCompleted(fileStatus), replayBus)
+
+          if (appListener.appId.isDefined) {
             val uiAclsEnabled = conf.getBoolean("spark.history.ui.acls.enable", false)
             ui.getSecurityManager.setAcls(uiAclsEnabled)
             // make sure to set admin acls before view acls so they are properly picked up
@@ -254,8 +252,11 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
               appListener.viewAcls.getOrElse(""))
             ui.getSecurityManager.setAdminAclsGroups(appListener.adminAclsGroups.getOrElse(""))
             ui.getSecurityManager.setViewAclsGroups(appListener.viewAclsGroups.getOrElse(""))
-            LoadedAppUI(ui, updateProbe(appId, attemptId, attempt.fileSize))
+            Some(LoadedAppUI(ui, updateProbe(appId, attemptId, attempt.fileSize)))
+          } else {
+            None
           }
+
         }
       }
     } catch {
@@ -411,28 +412,54 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
     }
   }
 
-
   /**
    * Replay the log files in the list and merge the list of old applications with new ones
    */
   private def mergeApplicationListing(fileStatus: FileStatus): Unit = {
     val newAttempts = try {
-        val bus = new ReplayListenerBus()
-        val res = replay(fileStatus, bus)
-        res match {
-          case Some(r) => logDebug(s"Application log ${r.logPath} loaded successfully:
$r")
-          case None => logWarning(s"Failed to load application log ${fileStatus.getPath}.
" +
-            "The application may have not started.")
-        }
-        res
-      } catch {
-        case e: Exception =>
-          logError(
-            s"Exception encountered when attempting to load application log ${fileStatus.getPath}",
-            e)
-          None
+      val eventsFilter: ReplayEventsFilter = { eventString =>
+        eventString.startsWith(APPL_START_EVENT_PREFIX) ||
+          eventString.startsWith(APPL_END_EVENT_PREFIX)
+      }
+
+      val logPath = fileStatus.getPath()
+
+      val appCompleted = isApplicationCompleted(fileStatus)
+
+      val appListener = replay(fileStatus, appCompleted, new ReplayListenerBus(), eventsFilter)
+
+      // Without an app ID, new logs will render incorrectly in the listing page, so do not
list or
+      // try to show their UI.
+      if (appListener.appId.isDefined) {
+        val attemptInfo = new FsApplicationAttemptInfo(
+          logPath.getName(),
+          appListener.appName.getOrElse(NOT_STARTED),
+          appListener.appId.getOrElse(logPath.getName()),
+          appListener.appAttemptId,
+          appListener.startTime.getOrElse(-1L),
+          appListener.endTime.getOrElse(-1L),
+          fileStatus.getModificationTime(),
+          appListener.sparkUser.getOrElse(NOT_STARTED),
+          appCompleted,
+          fileStatus.getLen()
+        )
+        fileToAppInfo(logPath) = attemptInfo
+        logDebug(s"Application log ${attemptInfo.logPath} loaded successfully: $attemptInfo")
+        Some(attemptInfo)
+      } else {
+        logWarning(s"Failed to load application log ${fileStatus.getPath}. " +
+          "The application may have not started.")
+        None
       }
 
+    } catch {
+      case e: Exception =>
+        logError(
+          s"Exception encountered when attempting to load application log ${fileStatus.getPath}",
+          e)
+        None
+    }
+
     if (newAttempts.isEmpty) {
       return
     }
@@ -564,12 +591,16 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
   }
 
   /**
-   * Replays the events in the specified log file and returns information about the associated
-   * application. Return `None` if the application ID cannot be located.
+   * Replays the events in the specified log file on the supplied `ReplayListenerBus`. Returns
+   * an `ApplicationEventListener` instance with event data captured from the replay.
+   * `ReplayEventsFilter` determines what events are replayed and can therefore limit the
+   * data captured in the returned `ApplicationEventListener` instance.
    */
   private def replay(
       eventLog: FileStatus,
-      bus: ReplayListenerBus): Option[FsApplicationAttemptInfo] = {
+      appCompleted: Boolean,
+      bus: ReplayListenerBus,
+      eventsFilter: ReplayEventsFilter = SELECT_ALL_FILTER): ApplicationEventListener = {
     val logPath = eventLog.getPath()
     logInfo(s"Replaying log path: $logPath")
     // Note that the eventLog may have *increased* in size since when we grabbed the filestatus,
@@ -581,30 +612,9 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
     val logInput = EventLoggingListener.openEventLog(logPath, fs)
     try {
       val appListener = new ApplicationEventListener
-      val appCompleted = isApplicationCompleted(eventLog)
       bus.addListener(appListener)
-      bus.replay(logInput, logPath.toString, !appCompleted)
-
-      // Without an app ID, new logs will render incorrectly in the listing page, so do not
list or
-      // try to show their UI.
-      if (appListener.appId.isDefined) {
-        val attemptInfo = new FsApplicationAttemptInfo(
-          logPath.getName(),
-          appListener.appName.getOrElse(NOT_STARTED),
-          appListener.appId.getOrElse(logPath.getName()),
-          appListener.appAttemptId,
-          appListener.startTime.getOrElse(-1L),
-          appListener.endTime.getOrElse(-1L),
-          eventLog.getModificationTime(),
-          appListener.sparkUser.getOrElse(NOT_STARTED),
-          appCompleted,
-          eventLog.getLen()
-        )
-        fileToAppInfo(logPath) = attemptInfo
-        Some(attemptInfo)
-      } else {
-        None
-      }
+      bus.replay(logInput, logPath.toString, !appCompleted, eventsFilter)
+      appListener
     } finally {
       logInput.close()
     }
@@ -689,6 +699,14 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
 
 private[history] object FsHistoryProvider {
   val DEFAULT_LOG_DIR = "file:/tmp/spark-events"
+
+  private val NOT_STARTED = "<Not Started>"
+
+  private val SPARK_HISTORY_FS_NUM_REPLAY_THREADS = "spark.history.fs.numReplayThreads"
+
+  private val APPL_START_EVENT_PREFIX = "{\"Event\":\"SparkListenerApplicationStart\""
+
+  private val APPL_END_EVENT_PREFIX = "{\"Event\":\"SparkListenerApplicationEnd\""
 }
 
 /**

http://git-wip-us.apache.org/repos/asf/spark/blob/c5fe3dd4/core/src/main/scala/org/apache/spark/scheduler/ReplayListenerBus.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/ReplayListenerBus.scala b/core/src/main/scala/org/apache/spark/scheduler/ReplayListenerBus.scala
index d32f5eb..3eff8d9 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/ReplayListenerBus.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/ReplayListenerBus.scala
@@ -25,6 +25,7 @@ import com.fasterxml.jackson.core.JsonParseException
 import org.json4s.jackson.JsonMethods._
 
 import org.apache.spark.internal.Logging
+import org.apache.spark.scheduler.ReplayListenerBus._
 import org.apache.spark.util.JsonProtocol
 
 /**
@@ -43,30 +44,45 @@ private[spark] class ReplayListenerBus extends SparkListenerBus with Logging
{
    * @param sourceName Filename (or other source identifier) from whence @logData is being
read
    * @param maybeTruncated Indicate whether log file might be truncated (some abnormal situations
    *        encountered, log file might not finished writing) or not
+   * @param eventsFilter Filter function to select JSON event strings in the log data stream
that
+   *        should be parsed and replayed. When not specified, all event strings in the log
data
+   *        are parsed and replayed.
    */
   def replay(
       logData: InputStream,
       sourceName: String,
-      maybeTruncated: Boolean = false): Unit = {
+      maybeTruncated: Boolean = false,
+      eventsFilter: ReplayEventsFilter = SELECT_ALL_FILTER): Unit = {
+
     var currentLine: String = null
-    var lineNumber: Int = 1
+    var lineNumber: Int = 0
+
     try {
-      val lines = Source.fromInputStream(logData).getLines()
-      while (lines.hasNext) {
-        currentLine = lines.next()
+      val lineEntries = Source.fromInputStream(logData)
+        .getLines()
+        .zipWithIndex
+        .filter { case (line, _) => eventsFilter(line) }
+
+      while (lineEntries.hasNext) {
         try {
+          val entry = lineEntries.next()
+
+          currentLine = entry._1
+          lineNumber = entry._2 + 1
+
           postToAll(JsonProtocol.sparkEventFromJson(parse(currentLine)))
         } catch {
           case jpe: JsonParseException =>
             // We can only ignore exception from last line of the file that might be truncated
-            if (!maybeTruncated || lines.hasNext) {
+            // the last entry may not be the very last line in the event log, but we treat
it
+            // as such in a best effort to replay the given input
+            if (!maybeTruncated || lineEntries.hasNext) {
               throw jpe
             } else {
               logWarning(s"Got JsonParseException from log file $sourceName" +
                 s" at line $lineNumber, the file might not have finished writing cleanly.")
             }
         }
-        lineNumber += 1
       }
     } catch {
       case ioe: IOException =>
@@ -78,3 +94,12 @@ private[spark] class ReplayListenerBus extends SparkListenerBus with Logging
{
   }
 
 }
+
+
+private[spark] object ReplayListenerBus {
+
+  type ReplayEventsFilter = (String) => Boolean
+
+  // utility filter that selects all event logs during replay
+  val SELECT_ALL_FILTER: ReplayEventsFilter = { (eventString: String) => true }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message