spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From van...@apache.org
Subject spark git commit: [SPARK-7336] [HISTORYSERVER] Fix bug that applications status incorrect on JobHistory UI.
Date Wed, 02 Sep 2015 18:03:46 GMT
Repository: spark
Updated Branches:
  refs/heads/master 00d9af5e1 -> c3b881a7d


[SPARK-7336] [HISTORYSERVER] Fix bug that applications status incorrect on JobHistory UI.

Author: ArcherShao <shaochuan@huawei.com>

Closes #5886 from ArcherShao/SPARK-7336.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/c3b881a7
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/c3b881a7
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/c3b881a7

Branch: refs/heads/master
Commit: c3b881a7d7e4736f7131ff002a80e25def1f63af
Parents: 00d9af5
Author: Chuan Shao <shaochuan@huawei.com>
Authored: Wed Sep 2 11:02:27 2015 -0700
Committer: Marcelo Vanzin <vanzin@cloudera.com>
Committed: Wed Sep 2 11:02:57 2015 -0700

----------------------------------------------------------------------
 .../deploy/history/FsHistoryProvider.scala      | 27 ++++++++++++++++----
 1 file changed, 22 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/c3b881a7/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
index e573ff1..a5755ea 100644
--- a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
@@ -18,6 +18,7 @@
 package org.apache.spark.deploy.history
 
 import java.io.{BufferedInputStream, FileNotFoundException, InputStream, IOException, OutputStream}
+import java.util.UUID
 import java.util.concurrent.{ExecutorService, Executors, TimeUnit}
 import java.util.zip.{ZipEntry, ZipOutputStream}
 
@@ -73,7 +74,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
   // The modification time of the newest log detected during the last scan. This is used
   // to ignore logs that are older during subsequent scans, to avoid processing data that
   // is already known.
-  private var lastModifiedTime = -1L
+  private var lastScanTime = -1L
 
   // Mapping of application IDs to their metadata, in descending end time order. Apps are
inserted
   // into the map in order, so the LinkedHashMap maintains the correct ordering.
@@ -179,15 +180,14 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
    */
   private[history] def checkForLogs(): Unit = {
     try {
+      val newLastScanTime = getNewLastScanTime()
       val statusList = Option(fs.listStatus(new Path(logDir))).map(_.toSeq)
         .getOrElse(Seq[FileStatus]())
-      var newLastModifiedTime = lastModifiedTime
       val logInfos: Seq[FileStatus] = statusList
         .filter { entry =>
           try {
             getModificationTime(entry).map { time =>
-              newLastModifiedTime = math.max(newLastModifiedTime, time)
-              time >= lastModifiedTime
+              time >= lastScanTime
             }.getOrElse(false)
           } catch {
             case e: AccessControlException =>
@@ -224,12 +224,29 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
           }
         }
 
-      lastModifiedTime = newLastModifiedTime
+      lastScanTime = newLastScanTime
     } catch {
       case e: Exception => logError("Exception in checking for event log updates", e)
     }
   }
 
+  private def getNewLastScanTime(): Long = {
+    val fileName = "." + UUID.randomUUID().toString
+    val path = new Path(logDir, fileName)
+    val fos = fs.create(path)
+
+    try {
+      fos.close()
+      fs.getFileStatus(path).getModificationTime
+    } catch {
+      case e: Exception =>
+        logError("Exception encountered when attempting to update last scan time", e)
+        lastScanTime
+    } finally {
+      fs.delete(path)
+    }
+  }
+
   override def writeEventLogs(
       appId: String,
       attemptId: Option[String],


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message