spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From pwend...@apache.org
Subject [01/16] git commit: Fixed multiple file stream and checkpointing bugs. - Made file stream more robust to transient failures. - Changed Spark.setCheckpointDir API to not have the second 'useExisting' parameter. Spark will always create a unique director
Date Tue, 31 Dec 2013 18:13:04 GMT
Updated Branches:
  refs/heads/master 50e3b8ec4 -> 55b7e2fdf


Fixed multiple file stream and checkpointing bugs.
- Made file stream more robust to transient failures.
- Changed Spark.setCheckpointDir API to not have the second
  'useExisting' parameter. Spark will always create a unique directory
  for checkpointing underneath the directory provide to the funtion.
- Fixed bug wrt local relative paths as checkpoint directory.
- Made DStream and RDD checkpointing use
  SparkContext.hadoopConfiguration, so that more HDFS compatible
  filesystems are supported for checkpointing.


Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/5e9ce83d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/5e9ce83d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/5e9ce83d

Branch: refs/heads/master
Commit: 5e9ce83d682d6198cda4631faf11cb53fcccf07f
Parents: 6169fe1
Author: Tathagata Das <tathagata.das1565@gmail.com>
Authored: Wed Dec 11 14:01:36 2013 -0800
Committer: Tathagata Das <tathagata.das1565@gmail.com>
Committed: Wed Dec 11 14:01:36 2013 -0800

----------------------------------------------------------------------
 .../scala/org/apache/spark/SparkContext.scala   |  25 ++--
 .../spark/api/java/JavaSparkContext.scala       |  15 +--
 .../org/apache/spark/rdd/CheckpointRDD.scala    |  27 +++-
 .../apache/spark/rdd/RDDCheckpointData.scala    |  14 +-
 .../scala/org/apache/spark/JavaAPISuite.java    |   4 +-
 .../org/apache/spark/streaming/Checkpoint.scala |   6 +-
 .../org/apache/spark/streaming/Scheduler.scala  |   2 +-
 .../spark/streaming/StreamingContext.scala      |  15 ++-
 .../streaming/dstream/FileInputDStream.scala    | 130 ++++++++++++-------
 .../spark/streaming/CheckpointSuite.scala       |  38 +++---
 10 files changed, 159 insertions(+), 117 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/5e9ce83d/core/src/main/scala/org/apache/spark/SparkContext.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index 66006bf..1811bfa 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -19,7 +19,7 @@ package org.apache.spark
 
 import java.io._
 import java.net.URI
-import java.util.Properties
+import java.util.{UUID, Properties}
 import java.util.concurrent.atomic.AtomicInteger
 
 import scala.collection.Map
@@ -857,22 +857,15 @@ class SparkContext(
 
   /**
    * Set the directory under which RDDs are going to be checkpointed. The directory must
-   * be a HDFS path if running on a cluster. If the directory does not exist, it will
-   * be created. If the directory exists and useExisting is set to true, then the
-   * exisiting directory will be used. Otherwise an exception will be thrown to
-   * prevent accidental overriding of checkpoint files in the existing directory.
+   * be a HDFS path if running on a cluster.
    */
-  def setCheckpointDir(dir: String, useExisting: Boolean = false) {
-    val path = new Path(dir)
-    val fs = path.getFileSystem(SparkHadoopUtil.get.newConfiguration())
-    if (!useExisting) {
-      if (fs.exists(path)) {
-        throw new Exception("Checkpoint directory '" + path + "' already exists.")
-      } else {
-        fs.mkdirs(path)
-      }
-    }
-    checkpointDir = Some(dir)
+  def setCheckpointDir(directory: String) {
+    checkpointDir = Option(directory).map(dir => {
+      val path = new Path(dir, UUID.randomUUID().toString)
+      val fs = path.getFileSystem(hadoopConfiguration)
+      fs.mkdirs(path)
+      fs.getFileStatus(path).getPath().toString
+    })
   }
 
   /** Default level of parallelism to use when not given by user (e.g. parallelize and makeRDD).
*/

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/5e9ce83d/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala b/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala
index 8869e07..c63db49 100644
--- a/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala
@@ -385,20 +385,7 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork
 
   /**
    * Set the directory under which RDDs are going to be checkpointed. The directory must
-   * be a HDFS path if running on a cluster. If the directory does not exist, it will
-   * be created. If the directory exists and useExisting is set to true, then the
-   * exisiting directory will be used. Otherwise an exception will be thrown to
-   * prevent accidental overriding of checkpoint files in the existing directory.
-   */
-  def setCheckpointDir(dir: String, useExisting: Boolean) {
-    sc.setCheckpointDir(dir, useExisting)
-  }
-
-  /**
-   * Set the directory under which RDDs are going to be checkpointed. The directory must
-   * be a HDFS path if running on a cluster. If the directory does not exist, it will
-   * be created. If the directory exists, an exception will be thrown to prevent accidental
-   * overriding of checkpoint files.
+   * be a HDFS path if running on a cluster.
    */
   def setCheckpointDir(dir: String) {
     sc.setCheckpointDir(dir)

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/5e9ce83d/core/src/main/scala/org/apache/spark/rdd/CheckpointRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/CheckpointRDD.scala b/core/src/main/scala/org/apache/spark/rdd/CheckpointRDD.scala
index d3033ea..ef4057e 100644
--- a/core/src/main/scala/org/apache/spark/rdd/CheckpointRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/CheckpointRDD.scala
@@ -26,6 +26,7 @@ import org.apache.hadoop.util.ReflectionUtils
 import org.apache.hadoop.fs.Path
 import java.io.{File, IOException, EOFException}
 import java.text.NumberFormat
+import org.apache.spark.broadcast.Broadcast
 
 private[spark] class CheckpointRDDPartition(val index: Int) extends Partition {}
 
@@ -36,6 +37,8 @@ private[spark]
 class CheckpointRDD[T: ClassManifest](sc: SparkContext, val checkpointPath: String)
   extends RDD[T](sc, Nil) {
 
+  val broadcastedConf = sc.broadcast(new SerializableWritable(sc.hadoopConfiguration))
+
   @transient val fs = new Path(checkpointPath).getFileSystem(sc.hadoopConfiguration)
 
   override def getPartitions: Array[Partition] = {
@@ -67,7 +70,7 @@ class CheckpointRDD[T: ClassManifest](sc: SparkContext, val checkpointPath:
Stri
 
   override def compute(split: Partition, context: TaskContext): Iterator[T] = {
     val file = new Path(checkpointPath, CheckpointRDD.splitIdToFile(split.index))
-    CheckpointRDD.readFromFile(file, context)
+    CheckpointRDD.readFromFile(file, broadcastedConf, context)
   }
 
   override def checkpoint() {
@@ -81,10 +84,14 @@ private[spark] object CheckpointRDD extends Logging {
     "part-%05d".format(splitId)
   }
 
-  def writeToFile[T](path: String, blockSize: Int = -1)(ctx: TaskContext, iterator: Iterator[T])
{
+  def writeToFile[T](
+      path: String,
+      broadcastedConf: Broadcast[SerializableWritable[Configuration]],
+      blockSize: Int = -1
+    )(ctx: TaskContext, iterator: Iterator[T]) {
     val env = SparkEnv.get
     val outputDir = new Path(path)
-    val fs = outputDir.getFileSystem(SparkHadoopUtil.get.newConfiguration())
+    val fs = outputDir.getFileSystem(broadcastedConf.value.value)
 
     val finalOutputName = splitIdToFile(ctx.partitionId)
     val finalOutputPath = new Path(outputDir, finalOutputName)
@@ -121,9 +128,13 @@ private[spark] object CheckpointRDD extends Logging {
     }
   }
 
-  def readFromFile[T](path: Path, context: TaskContext): Iterator[T] = {
+  def readFromFile[T](
+      path: Path,
+      broadcastedConf: Broadcast[SerializableWritable[Configuration]],
+      context: TaskContext
+    ): Iterator[T] = {
     val env = SparkEnv.get
-    val fs = path.getFileSystem(SparkHadoopUtil.get.newConfiguration())
+    val fs = path.getFileSystem(broadcastedConf.value.value)
     val bufferSize = System.getProperty("spark.buffer.size", "65536").toInt
     val fileInputStream = fs.open(path, bufferSize)
     val serializer = env.serializer.newInstance()
@@ -146,8 +157,10 @@ private[spark] object CheckpointRDD extends Logging {
     val sc = new SparkContext(cluster, "CheckpointRDD Test")
     val rdd = sc.makeRDD(1 to 10, 10).flatMap(x => 1 to 10000)
     val path = new Path(hdfsPath, "temp")
-    val fs = path.getFileSystem(SparkHadoopUtil.get.newConfiguration())
-    sc.runJob(rdd, CheckpointRDD.writeToFile(path.toString, 1024) _)
+    val conf = SparkHadoopUtil.get.newConfiguration()
+    val fs = path.getFileSystem(conf)
+    val broadcastedConf = sc.broadcast(new SerializableWritable(conf))
+    sc.runJob(rdd, CheckpointRDD.writeToFile(path.toString, broadcastedConf, 1024) _)
     val cpRDD = new CheckpointRDD[Int](sc, path.toString)
     assert(cpRDD.partitions.length == rdd.partitions.length, "Number of partitions is not
the same")
     assert(cpRDD.collect.toList == rdd.collect.toList, "Data of partitions not the same")

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/5e9ce83d/core/src/main/scala/org/apache/spark/rdd/RDDCheckpointData.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/RDDCheckpointData.scala b/core/src/main/scala/org/apache/spark/rdd/RDDCheckpointData.scala
index 6009a41..3160ab9 100644
--- a/core/src/main/scala/org/apache/spark/rdd/RDDCheckpointData.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/RDDCheckpointData.scala
@@ -20,7 +20,7 @@ package org.apache.spark.rdd
 import org.apache.hadoop.fs.Path
 import org.apache.hadoop.conf.Configuration
 
-import org.apache.spark.{Partition, SparkException, Logging}
+import org.apache.spark.{SerializableWritable, Partition, SparkException, Logging}
 import org.apache.spark.scheduler.{ResultTask, ShuffleMapTask}
 
 /**
@@ -83,14 +83,20 @@ private[spark] class RDDCheckpointData[T: ClassManifest](rdd: RDD[T])
 
     // Create the output path for the checkpoint
     val path = new Path(rdd.context.checkpointDir.get, "rdd-" + rdd.id)
-    val fs = path.getFileSystem(new Configuration())
+    val fs = path.getFileSystem(rdd.context.hadoopConfiguration)
     if (!fs.mkdirs(path)) {
       throw new SparkException("Failed to create checkpoint path " + path)
     }
 
     // Save to file, and reload it as an RDD
-    rdd.context.runJob(rdd, CheckpointRDD.writeToFile(path.toString) _)
+    val broadcastedConf = rdd.context.broadcast(new SerializableWritable(rdd.context.hadoopConfiguration))
+    rdd.context.runJob(rdd, CheckpointRDD.writeToFile(path.toString, broadcastedConf) _)
     val newRDD = new CheckpointRDD[T](rdd.context, path.toString)
+    if (newRDD.partitions.size != rdd.partitions.size) {
+      throw new Exception(
+        "Checkpoint RDD " + newRDD + "("+ newRDD.partitions.size + ") has different " +
+          "number of partitions than original RDD " + rdd + "(" + rdd.partitions.size + ")")
+    }
 
     // Change the dependencies and partitions of the RDD
     RDDCheckpointData.synchronized {
@@ -99,8 +105,8 @@ private[spark] class RDDCheckpointData[T: ClassManifest](rdd: RDD[T])
       rdd.markCheckpointed(newRDD)   // Update the RDD's dependencies and partitions
       cpState = Checkpointed
       RDDCheckpointData.clearTaskCaches()
-      logInfo("Done checkpointing RDD " + rdd.id + ", new parent is RDD " + newRDD.id)
     }
+    logInfo("Done checkpointing RDD " + rdd.id + " to " + path + ", new parent is RDD " +
newRDD.id)
   }
 
   // Get preferred location of a split after checkpointing

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/5e9ce83d/core/src/test/scala/org/apache/spark/JavaAPISuite.java
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/JavaAPISuite.java b/core/src/test/scala/org/apache/spark/JavaAPISuite.java
index 4234f6e..ee5d8c9 100644
--- a/core/src/test/scala/org/apache/spark/JavaAPISuite.java
+++ b/core/src/test/scala/org/apache/spark/JavaAPISuite.java
@@ -851,7 +851,7 @@ public class JavaAPISuite implements Serializable {
   public void checkpointAndComputation() {
     File tempDir = Files.createTempDir();
     JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5));
-    sc.setCheckpointDir(tempDir.getAbsolutePath(), true);
+    sc.setCheckpointDir(tempDir.getAbsolutePath());
     Assert.assertEquals(false, rdd.isCheckpointed());
     rdd.checkpoint();
     rdd.count(); // Forces the DAG to cause a checkpoint
@@ -863,7 +863,7 @@ public class JavaAPISuite implements Serializable {
   public void checkpointAndRestore() {
     File tempDir = Files.createTempDir();
     JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5));
-    sc.setCheckpointDir(tempDir.getAbsolutePath(), true);
+    sc.setCheckpointDir(tempDir.getAbsolutePath());
     Assert.assertEquals(false, rdd.isCheckpointed());
     rdd.checkpoint();
     rdd.count(); // Forces the DAG to cause a checkpoint

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/5e9ce83d/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
index 9271914..bcf5e6b 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
@@ -27,6 +27,7 @@ import org.apache.hadoop.conf.Configuration
 import org.apache.spark.Logging
 import org.apache.spark.io.CompressionCodec
 import org.apache.spark.util.MetadataCleaner
+import org.apache.spark.deploy.SparkHadoopUtil
 
 
 private[streaming]
@@ -57,7 +58,7 @@ class Checkpoint(@transient ssc: StreamingContext, val checkpointTime: Time)
  * Convenience class to speed up the writing of graph checkpoint to file
  */
 private[streaming]
-class CheckpointWriter(checkpointDir: String) extends Logging {
+class CheckpointWriter(checkpointDir: String, hadoopConf: Configuration) extends Logging
{
   val file = new Path(checkpointDir, "graph")
   // The file to which we actually write - and then "move" to file.
   private val writeFile = new Path(file.getParent, file.getName + ".next")
@@ -65,8 +66,7 @@ class CheckpointWriter(checkpointDir: String) extends Logging {
 
   private var stopped = false
 
-  val conf = new Configuration()
-  var fs = file.getFileSystem(conf)
+  var fs = file.getFileSystem(hadoopConf)
   val maxAttempts = 3
   val executor = Executors.newFixedThreadPool(1)
 

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/5e9ce83d/streaming/src/main/scala/org/apache/spark/streaming/Scheduler.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/Scheduler.scala b/streaming/src/main/scala/org/apache/spark/streaming/Scheduler.scala
index ed892e3..4cd8695 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/Scheduler.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/Scheduler.scala
@@ -29,7 +29,7 @@ class Scheduler(ssc: StreamingContext) extends Logging {
   val concurrentJobs = System.getProperty("spark.streaming.concurrentJobs", "1").toInt
   val jobManager = new JobManager(ssc, concurrentJobs)
   val checkpointWriter = if (ssc.checkpointDuration != null && ssc.checkpointDir
!= null) {
-    new CheckpointWriter(ssc.checkpointDir)
+    new CheckpointWriter(ssc.checkpointDir, ssc.sparkContext.hadoopConfiguration)
   } else {
     null
   }

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/5e9ce83d/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
index 70bf902..d6fc2a1 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
@@ -46,6 +46,7 @@ import org.apache.hadoop.mapreduce.lib.input.TextInputFormat
 import org.apache.hadoop.fs.Path
 import twitter4j.Status
 import twitter4j.auth.Authorization
+import org.apache.spark.deploy.SparkHadoopUtil
 
 
 /**
@@ -85,7 +86,6 @@ class StreamingContext private (
          null, batchDuration)
   }
 
-
   /**
    * Re-create a StreamingContext from a checkpoint file.
    * @param path Path either to the directory that was specified as the checkpoint directory,
or
@@ -139,7 +139,7 @@ class StreamingContext private (
 
   protected[streaming] var checkpointDir: String = {
     if (isCheckpointPresent) {
-      sc.setCheckpointDir(StreamingContext.getSparkCheckpointDir(cp_.checkpointDir), true)
+      sc.setCheckpointDir(cp_.checkpointDir)
       cp_.checkpointDir
     } else {
       null
@@ -173,8 +173,12 @@ class StreamingContext private (
    */
   def checkpoint(directory: String) {
     if (directory != null) {
-      sc.setCheckpointDir(StreamingContext.getSparkCheckpointDir(directory))
-      checkpointDir = directory
+      val path = new Path(directory)
+      val fs = path.getFileSystem(sparkContext.hadoopConfiguration)
+      fs.mkdirs(path)
+      val fullPath = fs.getFileStatus(path).getPath().toString
+      sc.setCheckpointDir(fullPath)
+      checkpointDir = fullPath
     } else {
       checkpointDir = null
     }
@@ -595,8 +599,9 @@ object StreamingContext {
       prefix + "-" + time.milliseconds + "." + suffix
     }
   }
-
+  /*
   protected[streaming] def getSparkCheckpointDir(sscCheckpointDir: String): String = {
     new Path(sscCheckpointDir, UUID.randomUUID.toString).toString
   }
+  */
 }

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/5e9ce83d/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala
b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala
index fea0573..1a8db3a 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala
@@ -39,8 +39,8 @@ class FileInputDStream[K: ClassManifest, V: ClassManifest, F <: NewInputFormat[K
   protected[streaming] override val checkpointData = new FileInputDStreamCheckpointData
 
   // Latest file mod time seen till any point of time
-  private val lastModTimeFiles = new HashSet[String]()
-  private var lastModTime = 0L
+  private val prevModTimeFiles = new HashSet[String]()
+  private var prevModTime = 0L
 
   @transient private var path_ : Path = null
   @transient private var fs_ : FileSystem = null
@@ -48,11 +48,11 @@ class FileInputDStream[K: ClassManifest, V: ClassManifest, F <: NewInputFormat[K
 
   override def start() {
     if (newFilesOnly) {
-      lastModTime = graph.zeroTime.milliseconds
+      prevModTime = graph.zeroTime.milliseconds
     } else {
-      lastModTime = 0
+      prevModTime = 0
     }
-    logDebug("LastModTime initialized to " + lastModTime + ", new files only = " + newFilesOnly)
+    logDebug("LastModTime initialized to " + prevModTime + ", new files only = " + newFilesOnly)
   }
   
   override def stop() { }
@@ -67,55 +67,22 @@ class FileInputDStream[K: ClassManifest, V: ClassManifest, F <: NewInputFormat[K
    * the previous call.
    */
   override def compute(validTime: Time): Option[RDD[(K, V)]] = {
-    assert(validTime.milliseconds >= lastModTime, "Trying to get new files for really
old time [" + validTime + " < " + lastModTime)
-
-    // Create the filter for selecting new files
-    val newFilter = new PathFilter() {
-      // Latest file mod time seen in this round of fetching files and its corresponding
files
-      var latestModTime = 0L
-      val latestModTimeFiles = new HashSet[String]()
-
-      def accept(path: Path): Boolean = {
-        if (!filter(path)) {  // Reject file if it does not satisfy filter
-          logDebug("Rejected by filter " + path)
-          return false
-        } else {              // Accept file only if
-          val modTime = fs.getFileStatus(path).getModificationTime()
-          logDebug("Mod time for " + path + " is " + modTime)
-          if (modTime < lastModTime) {
-            logDebug("Mod time less than last mod time")
-            return false  // If the file was created before the last time it was called
-          } else if (modTime == lastModTime && lastModTimeFiles.contains(path.toString))
{
-            logDebug("Mod time equal to last mod time, but file considered already")
-            return false  // If the file was created exactly as lastModTime but not reported
yet
-          } else if (modTime > validTime.milliseconds) {
-            logDebug("Mod time more than valid time")
-            return false  // If the file was created after the time this function call requires
-          }
-          if (modTime > latestModTime) {
-            latestModTime = modTime
-            latestModTimeFiles.clear()
-            logDebug("Latest mod time updated to " + latestModTime)
-          }
-          latestModTimeFiles += path.toString
-          logDebug("Accepted " + path)
-          return true
-        }        
-      }
-    }
-    logDebug("Finding new files at time " + validTime + " for last mod time = " + lastModTime)
-    val newFiles = fs.listStatus(path, newFilter).map(_.getPath.toString)
+    assert(validTime.milliseconds >= prevModTime,
+      "Trying to get new files for really old time [" + validTime + " < " + prevModTime)
+
+    // Find new files
+    val (newFiles, latestModTime, latestModTimeFiles) = findNewFiles(validTime.milliseconds)
     logInfo("New files at time " + validTime + ":\n" + newFiles.mkString("\n"))
     if (newFiles.length > 0) {
       // Update the modification time and the files processed for that modification time
-      if (lastModTime != newFilter.latestModTime) {
-        lastModTime = newFilter.latestModTime
-        lastModTimeFiles.clear()
+      if (prevModTime < latestModTime) {
+        prevModTime = latestModTime
+        prevModTimeFiles.clear()
       }
-      lastModTimeFiles ++= newFilter.latestModTimeFiles
-      logDebug("Last mod time updated to " + lastModTime)
+      prevModTimeFiles ++= latestModTimeFiles
+      logDebug("Last mod time updated to " + prevModTime)
     }
-    files += ((validTime, newFiles))
+    files += ((validTime, newFiles.toArray))
     Some(filesToRDD(newFiles))
   }
 
@@ -130,8 +97,30 @@ class FileInputDStream[K: ClassManifest, V: ClassManifest, F <: NewInputFormat[K
       oldFiles.map(p => (p._1, p._2.mkString(", "))).mkString("\n"))
   }
 
+  /**
+   * Finds files which have modification timestamp <= current time. If some files are
being
+   * deleted in the directory, then it can generate transient exceptions. Hence, multiple
+   * attempts are made to handle these transient exceptions. Returns 3-tuple
+   * (new files found, latest modification time among them, files with latest modification
time)
+   */
+  private def findNewFiles(currentTime: Long): (Seq[String], Long, Seq[String]) = {
+    logDebug("Trying to get new files for time " + currentTime)
+    var attempts = 0
+    while (attempts < FileInputDStream.MAX_ATTEMPTS) {
+      attempts += 1
+      try {
+        val filter = new CustomPathFilter(currentTime)
+        val newFiles = fs.listStatus(path, filter)
+        return (newFiles.map(_.getPath.toString), filter.latestModTime, filter.latestModTimeFiles.toSeq)
+      } catch {
+        case ioe: IOException => logWarning("Attempt " + attempts + " to get new files
failed", ioe)
+      }
+    }
+    (Seq(), -1, Seq())
+  }
+
   /** Generate one RDD from an array of files */
-  protected[streaming] def filesToRDD(files: Seq[String]): RDD[(K, V)] = {
+  private def filesToRDD(files: Seq[String]): RDD[(K, V)] = {
     new UnionRDD(
       context.sparkContext,
       files.map(file => context.sparkContext.newAPIHadoopFile[K, V, F](file))
@@ -189,10 +178,51 @@ class FileInputDStream[K: ClassManifest, V: ClassManifest, F <: NewInputFormat[K
         hadoopFiles.map(p => (p._1, p._2.mkString(", "))).mkString("\n") + "\n]"
     }
   }
+
+  /**
+   * PathFilter to find new files that have modification timestamps <= current time, but
have not
+   * been seen before (i.e. the file should not be in lastModTimeFiles)
+   * @param currentTime
+   */
+  private[streaming]
+  class CustomPathFilter(currentTime: Long) extends PathFilter() {
+    // Latest file mod time seen in this round of fetching files and its corresponding files
+    var latestModTime = 0L
+    val latestModTimeFiles = new HashSet[String]()
+
+    def accept(path: Path): Boolean = {
+      if (!filter(path)) {  // Reject file if it does not satisfy filter
+        logDebug("Rejected by filter " + path)
+        return false
+      } else {              // Accept file only if
+      val modTime = fs.getFileStatus(path).getModificationTime()
+        logDebug("Mod time for " + path + " is " + modTime)
+        if (modTime < prevModTime) {
+          logDebug("Mod time less than last mod time")
+          return false  // If the file was created before the last time it was called
+        } else if (modTime == prevModTime && prevModTimeFiles.contains(path.toString))
{
+          logDebug("Mod time equal to last mod time, but file considered already")
+          return false  // If the file was created exactly as lastModTime but not reported
yet
+        } else if (modTime > currentTime) {
+          logDebug("Mod time more than valid time")
+          return false  // If the file was created after the time this function call requires
+        }
+        if (modTime > latestModTime) {
+          latestModTime = modTime
+          latestModTimeFiles.clear()
+          logDebug("Latest mod time updated to " + latestModTime)
+        }
+        latestModTimeFiles += path.toString
+        logDebug("Accepted " + path)
+        return true
+      }
+    }
+  }
 }
 
 private[streaming]
 object FileInputDStream {
+  val MAX_ATTEMPTS = 10
   def defaultFilter(path: Path): Boolean = !path.getName().startsWith(".")
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/5e9ce83d/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
----------------------------------------------------------------------
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
index beb2083..e517549 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
@@ -25,8 +25,10 @@ import org.scalatest.BeforeAndAfter
 import org.apache.commons.io.FileUtils
 import collection.mutable.{SynchronizedBuffer, ArrayBuffer}
 import util.{Clock, ManualClock}
-import scala.util.Random
 import com.google.common.io.Files
+import org.apache.hadoop.fs.{Path, FileSystem}
+import org.apache.hadoop.conf.Configuration
+
 
 
 /**
@@ -44,7 +46,7 @@ class CheckpointSuite extends TestSuiteBase with BeforeAndAfter {
 
   after {
     if (ssc != null) ssc.stop()
-    FileUtils.deleteDirectory(new File(checkpointDir))
+    //FileUtils.deleteDirectory(new File(checkpointDir))
 
     // To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown
     System.clearProperty("spark.driver.port")
@@ -66,7 +68,7 @@ class CheckpointSuite extends TestSuiteBase with BeforeAndAfter {
     System.setProperty("spark.streaming.clock", "org.apache.spark.streaming.util.ManualClock")
 
     val stateStreamCheckpointInterval = Seconds(1)
-
+    val fs = FileSystem.getLocal(new Configuration())
     // this ensure checkpointing occurs at least once
     val firstNumBatches = (stateStreamCheckpointInterval / batchDuration).toLong * 2
     val secondNumBatches = firstNumBatches
@@ -90,11 +92,12 @@ class CheckpointSuite extends TestSuiteBase with BeforeAndAfter {
     ssc.start()
     advanceTimeWithRealDelay(ssc, firstNumBatches)
     logInfo("Checkpoint data of state stream = \n" + stateStream.checkpointData)
-    assert(!stateStream.checkpointData.checkpointFiles.isEmpty, "No checkpointed RDDs in
state stream before first failure")
+    assert(!stateStream.checkpointData.checkpointFiles.isEmpty,
+      "No checkpointed RDDs in state stream before first failure")
     stateStream.checkpointData.checkpointFiles.foreach {
-      case (time, data) => {
-        val file = new File(data.toString)
-        assert(file.exists(), "Checkpoint file '" + file +"' for time " + time + " for state
stream before first failure does not exist")
+      case (time, file) => {
+        assert(fs.exists(new Path(file)), "Checkpoint file '" + file +"' for time " + time
+
+            " for state stream before first failure does not exist")
       }
     }
 
@@ -102,7 +105,8 @@ class CheckpointSuite extends TestSuiteBase with BeforeAndAfter {
     // and check whether the earlier checkpoint files are deleted
     val checkpointFiles = stateStream.checkpointData.checkpointFiles.map(x => new File(x._2))
     advanceTimeWithRealDelay(ssc, secondNumBatches)
-    checkpointFiles.foreach(file => assert(!file.exists, "Checkpoint file '" + file +
"' was not deleted"))
+    checkpointFiles.foreach(file =>
+      assert(!file.exists, "Checkpoint file '" + file + "' was not deleted"))
     ssc.stop()
 
     // Restart stream computation using the checkpoint file and check whether
@@ -110,19 +114,20 @@ class CheckpointSuite extends TestSuiteBase with BeforeAndAfter {
     ssc = new StreamingContext(checkpointDir)
     stateStream = ssc.graph.getOutputStreams().head.dependencies.head.dependencies.head
     logInfo("Restored data of state stream = \n[" + stateStream.generatedRDDs.mkString("\n")
+ "]")
-    assert(!stateStream.generatedRDDs.isEmpty, "No restored RDDs in state stream after recovery
from first failure")
+    assert(!stateStream.generatedRDDs.isEmpty,
+      "No restored RDDs in state stream after recovery from first failure")
 
 
     // Run one batch to generate a new checkpoint file and check whether some RDD
     // is present in the checkpoint data or not
     ssc.start()
     advanceTimeWithRealDelay(ssc, 1)
-    assert(!stateStream.checkpointData.checkpointFiles.isEmpty, "No checkpointed RDDs in
state stream before second failure")
+    assert(!stateStream.checkpointData.checkpointFiles.isEmpty,
+      "No checkpointed RDDs in state stream before second failure")
     stateStream.checkpointData.checkpointFiles.foreach {
-      case (time, data) => {
-        val file = new File(data.toString)
-        assert(file.exists(),
-          "Checkpoint file '" + file +"' for time " + time + " for state stream before seconds
failure does not exist")
+      case (time, file) => {
+        assert(fs.exists(new Path(file)), "Checkpoint file '" + file +"' for time " + time
+
+          " for state stream before seconds failure does not exist")
       }
     }
     ssc.stop()
@@ -132,7 +137,8 @@ class CheckpointSuite extends TestSuiteBase with BeforeAndAfter {
     ssc = new StreamingContext(checkpointDir)
     stateStream = ssc.graph.getOutputStreams().head.dependencies.head.dependencies.head
     logInfo("Restored data of state stream = \n[" + stateStream.generatedRDDs.mkString("\n")
+ "]")
-    assert(!stateStream.generatedRDDs.isEmpty, "No restored RDDs in state stream after recovery
from second failure")
+    assert(!stateStream.generatedRDDs.isEmpty,
+      "No restored RDDs in state stream after recovery from second failure")
 
     // Adjust manual clock time as if it is being restarted after a delay
     System.setProperty("spark.streaming.manualClock.jump", (batchDuration.milliseconds *
7).toString)
@@ -143,6 +149,7 @@ class CheckpointSuite extends TestSuiteBase with BeforeAndAfter {
     ssc = null
   }
 
+
   // This tests whether the systm can recover from a master failure with simple
   // non-stateful operations. This assumes as reliable, replayable input
   // source - TestInputDStream.
@@ -191,6 +198,7 @@ class CheckpointSuite extends TestSuiteBase with BeforeAndAfter {
     testCheckpointedOperation(input, operation, output, 7)
   }
 
+
   // This tests whether file input stream remembers what files were seen before
   // the master failure and uses them again to process a large window operation.
   // It also tests whether batches, whose processing was incomplete due to the


Mime
View raw message