spark-reviews mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From vanzin <...@git.apache.org>
Subject [GitHub] spark pull request #15556: [SPARK-18010][Core] Reduce work performed for bui...
Date Thu, 20 Oct 2016 20:57:11 GMT
Github user vanzin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/15556#discussion_r84373054
  
    --- Diff: core/src/main/scala/org/apache/spark/scheduler/ReplayListenerBus.scala ---
    @@ -43,38 +43,56 @@ private[spark] class ReplayListenerBus extends SparkListenerBus with
Logging {
        * @param sourceName Filename (or other source identifier) from whence @logData is
being read
        * @param maybeTruncated Indicate whether log file might be truncated (some abnormal
situations
        *        encountered, log file might not finished writing) or not
    +   * @param eventsFilter Filter function to select JSON event strings in the log data
stream that
    +   *        should be parsed and replayed. When not specified, all event strings in the
log data
    +   *        are parsed and replayed.
        */
       def replay(
           logData: InputStream,
           sourceName: String,
    -      maybeTruncated: Boolean = false): Unit = {
    -    var currentLine: String = null
    -    var lineNumber: Int = 1
    +      maybeTruncated: Boolean = false,
    +      eventsFilter: (String) => Boolean = ReplayListenerBus.SELECT_ALL_FILTER): Unit
= {
         try {
    -      val lines = Source.fromInputStream(logData).getLines()
    -      while (lines.hasNext) {
    -        currentLine = lines.next()
    +      val lineEntries = Source.fromInputStream(logData)
    +        .getLines()
    +        .zipWithIndex
    +        .filter(entry => eventsFilter(entry._1))
    +
    +      var entry: (String, Int) = ("", 0)
    +
    +      while (lineEntries.hasNext) {
             try {
    -          postToAll(JsonProtocol.sparkEventFromJson(parse(currentLine)))
    +          entry = lineEntries.next()
    --- End diff --
    
    nit: `val (line, lineno) = lineEntries.next()`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Mime
View raw message