spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From pwend...@apache.org
Subject [11/16] git commit: Removed slack time in file stream and added better handling of exceptions due to failures due FileNotFound exceptions.
Date Tue, 31 Dec 2013 18:13:14 GMT
Removed slack time in file stream and added better handling of exceptions due to failures due
FileNotFound exceptions.


Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/bacc65cf
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/bacc65cf
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/bacc65cf

Branch: refs/heads/master
Commit: bacc65cf28b9f95b129e9adede43f684f2c5ced3
Parents: d4dfab5
Author: Tathagata Das <tathagata.das1565@gmail.com>
Authored: Thu Dec 26 10:18:46 2013 +0000
Committer: Tathagata Das <tathagata.das1565@gmail.com>
Committed: Thu Dec 26 10:18:46 2013 +0000

----------------------------------------------------------------------
 .../streaming/dstream/FileInputDStream.scala    | 59 +++++++-------------
 .../spark/streaming/CheckpointSuite.scala       |  6 --
 .../spark/streaming/InputStreamsSuite.scala     |  6 --
 3 files changed, 21 insertions(+), 50 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/bacc65cf/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala
b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala
index d6514a1..b163b13 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala
@@ -40,9 +40,6 @@ class FileInputDStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K,V]
: Clas
 
   protected[streaming] override val checkpointData = new FileInputDStreamCheckpointData
 
-  // Max attempts to try if listing files fail
-  val MAX_ATTEMPTS = 10
-
   // Latest file mod time seen till any point of time
   private val prevModTimeFiles = new HashSet[String]()
   private var prevModTime = 0L
@@ -109,19 +106,15 @@ class FileInputDStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K,V]
: Clas
    * (new files found, latest modification time among them, files with latest modification
time)
    */
   private def findNewFiles(currentTime: Long): (Seq[String], Long, Seq[String]) = {
-    logDebug("Trying to get new files for time " + currentTime)
-    var attempts = 0
-    while (attempts < MAX_ATTEMPTS) {
-      attempts += 1
-      try {
-        val filter = new CustomPathFilter(currentTime)
-        val newFiles = fs.listStatus(path, filter).map(_.getPath.toString)
-        return (newFiles, filter.latestModTime, filter.latestModTimeFiles.toSeq)
-      } catch {
-        case ioe: IOException =>
-          logWarning("Attempt " + attempts + " to get new files failed", ioe)
-          reset()
-      }
+    try {
+      logDebug("Trying to get new files for time " + currentTime)
+      val filter = new CustomPathFilter(currentTime)
+      val newFiles = fs.listStatus(path, filter).map(_.getPath.toString)
+      return (newFiles, filter.latestModTime, filter.latestModTimeFiles.toSeq)
+    } catch {
+      case e: Exception =>
+        logError("Attempt to get new files failed", e)
+        reset()
     }
     (Seq.empty, -1, Seq.empty)
   }
@@ -193,22 +186,17 @@ class FileInputDStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K,V]
: Clas
    * been seen before (i.e. the file should not be in lastModTimeFiles)
    */
   private[streaming]
-  class CustomPathFilter(currentTime: Long) extends PathFilter {
+  class CustomPathFilter(maxModTime: Long) extends PathFilter {
     // Latest file mod time seen in this round of fetching files and its corresponding files
     var latestModTime = 0L
     val latestModTimeFiles = new HashSet[String]()
 
-    // Creating an RDD from a HDFS file immediately after the file is created sometime returns
-    // an RDD with 0 partitions. To avoid that, we introduce a slack time - files that are
older
-    // than slack time from current time is considered for processing.
-    val slackTime = System.getProperty("spark.streaming.fileStream.slackTime", "2000").toLong
-    val maxModTime = currentTime - slackTime
-
     def accept(path: Path): Boolean = {
-      if (!filter(path)) {  // Reject file if it does not satisfy filter
-        logDebug("Rejected by filter " + path)
-        return false
-      } else {              // Accept file only if
+      try {
+        if (!filter(path)) {  // Reject file if it does not satisfy filter
+          logDebug("Rejected by filter " + path)
+          return false
+        }
         val modTime = fs.getFileStatus(path).getModificationTime()
         logDebug("Mod time for " + path + " is " + modTime)
         if (modTime < prevModTime) {
@@ -228,8 +216,13 @@ class FileInputDStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K,V]
: Clas
         }
         latestModTimeFiles += path.toString
         logDebug("Accepted " + path)
-        return true
+      } catch {
+        case fnfe: java.io.FileNotFoundException => 
+          logWarning("Error finding new files", fnfe)
+          reset()
+          return false
       }
+      return true
     }
   }
 }
@@ -237,14 +230,4 @@ class FileInputDStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K,V]
: Clas
 private[streaming]
 object FileInputDStream {
   def defaultFilter(path: Path): Boolean = !path.getName().startsWith(".")
-
-  // Disable slack time (i.e. set it to zero)
-  private[streaming] def disableSlackTime() {
-    System.setProperty("spark.streaming.fileStream.slackTime", "0")
-  }
-
-  // Restore default value of slack time
-  private[streaming] def restoreSlackTime() {
-    System.clearProperty("spark.streaming.fileStream.slackTime")
-  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/bacc65cf/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
----------------------------------------------------------------------
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
index 0347cc1..4e25c95 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
@@ -200,9 +200,6 @@ class CheckpointSuite extends TestSuiteBase {
     val clockProperty = System.getProperty("spark.streaming.clock")
     System.clearProperty("spark.streaming.clock")
 
-    // Disable slack time of file stream when testing with local file system
-    FileInputDStream.disableSlackTime()
-
     // Set up the streaming context and input streams
     val testDir = Files.createTempDir()
     var ssc = new StreamingContext(master, framework, Seconds(1))
@@ -303,9 +300,6 @@ class CheckpointSuite extends TestSuiteBase {
     // Enable manual clock back again for other tests
     if (clockProperty != null)
       System.setProperty("spark.streaming.clock", clockProperty)
-
-    // Restore the default slack time
-    FileInputDStream.restoreSlackTime()
   }
 
 

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/bacc65cf/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala
----------------------------------------------------------------------
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala
index e506c95..5fa14ad 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala
@@ -152,9 +152,6 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
     // Disable manual clock as FileInputDStream does not work with manual clock
     System.clearProperty("spark.streaming.clock")
 
-    // Disable slack time of file stream when testing with local file system
-    FileInputDStream.disableSlackTime()
-
     // Set up the streaming context and input streams
     val testDir = Files.createTempDir()
     val ssc = new StreamingContext(master, framework, batchDuration)
@@ -199,9 +196,6 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
 
     // Enable manual clock back again for other tests
     System.setProperty("spark.streaming.clock", "org.apache.spark.streaming.util.ManualClock")
-
-    // Restore the default slack time
-    FileInputDStream.restoreSlackTime()
   }
 
 


Mime
View raw message