spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sro...@apache.org
Subject spark git commit: [SPARK-5582] [history] Ignore empty log directories.
Date Fri, 06 Feb 2015 10:07:25 GMT
Repository: spark
Updated Branches:
  refs/heads/master 24dbc50b9 -> 856928979


[SPARK-5582] [history] Ignore empty log directories.

Empty log directories are not useful at the moment, but if one ends
up showing in the log root, it breaks the code that checks for log
directories.

Author: Marcelo Vanzin <vanzin@cloudera.com>

Closes #4352 from vanzin/SPARK-5582 and squashes the following commits:

1a6a3d4 [Marcelo Vanzin] [SPARK-5582] Fix exception when looking at empty directories.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/85692897
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/85692897
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/85692897

Branch: refs/heads/master
Commit: 856928979f7d00fbb518fc1102a2e7d80cea0f7c
Parents: 24dbc50
Author: Marcelo Vanzin <vanzin@cloudera.com>
Authored: Fri Feb 6 10:07:20 2015 +0000
Committer: Sean Owen <sowen@cloudera.com>
Committed: Fri Feb 6 10:07:20 2015 +0000

----------------------------------------------------------------------
 .../deploy/history/FsHistoryProvider.scala      | 22 +++++++++++++-------
 .../deploy/history/FsHistoryProviderSuite.scala | 18 ++++++++++++++++
 2 files changed, 32 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/85692897/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
index 0ae45f4..92125f2 100644
--- a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
@@ -173,9 +173,10 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis
       val logInfos = statusList
         .filter { entry =>
           try {
-            val modTime = getModificationTime(entry)
-            newLastModifiedTime = math.max(newLastModifiedTime, modTime)
-            modTime >= lastModifiedTime
+            getModificationTime(entry).map { time =>
+              newLastModifiedTime = math.max(newLastModifiedTime, time)
+              time >= lastModifiedTime
+            }.getOrElse(false)
           } catch {
             case e: AccessControlException =>
               // Do not use "logInfo" since these messages can get pretty noisy if printed
on
@@ -251,7 +252,7 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis
         appListener.appName.getOrElse(NOT_STARTED),
         appListener.startTime.getOrElse(-1L),
         appListener.endTime.getOrElse(-1L),
-        getModificationTime(eventLog),
+        getModificationTime(eventLog).get,
         appListener.sparkUser.getOrElse(NOT_STARTED),
         isApplicationCompleted(eventLog))
     } finally {
@@ -310,11 +311,16 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis
    */
   private def isLegacyLogDirectory(entry: FileStatus): Boolean = entry.isDir()
 
-  private def getModificationTime(fsEntry: FileStatus): Long = {
-    if (fsEntry.isDir) {
-      fs.listStatus(fsEntry.getPath).map(_.getModificationTime()).max
+  /**
+   * Returns the modification time of the given event log. If the status points at an empty
+   * directory, `None` is returned, indicating that there isn't an event log at that location.
+   */
+  private def getModificationTime(fsEntry: FileStatus): Option[Long] = {
+    if (isLegacyLogDirectory(fsEntry)) {
+      val statusList = fs.listStatus(fsEntry.getPath)
+      if (!statusList.isEmpty) Some(statusList.map(_.getModificationTime()).max) else None
     } else {
-      fsEntry.getModificationTime()
+      Some(fsEntry.getModificationTime())
     }
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/85692897/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala
b/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala
index 3fbc1a2..1d95432 100644
--- a/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala
@@ -190,6 +190,24 @@ class FsHistoryProviderSuite extends FunSuite with BeforeAndAfter with
Matchers
     appListAfterRename.head.logPath should not endWith(EventLoggingListener.IN_PROGRESS)
   }
 
+  test("SPARK-5582: empty log directory") {
+    val conf = new SparkConf()
+      .set("spark.history.fs.logDirectory", testDir.getAbsolutePath())
+    val provider = new FsHistoryProvider(conf)
+
+    val logFile1 = new File(testDir, "app1" + EventLoggingListener.IN_PROGRESS)
+    writeFile(logFile1, true, None,
+      SparkListenerApplicationStart("app1", Some("app1"), 1L, "test"),
+      SparkListenerApplicationEnd(2L))
+
+    val oldLog = new File(testDir, "old1")
+    oldLog.mkdir()
+
+    provider.checkForLogs()
+    val appListAfterRename = provider.getListing()
+    appListAfterRename.size should be (1)
+  }
+
   private def writeFile(file: File, isNewFormat: Boolean, codec: Option[CompressionCodec],
     events: SparkListenerEvent*) = {
     val out =


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message