spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From pwend...@apache.org
Subject [15/20] git commit: Fixed conf/slaves and updated docs.
Date Sat, 11 Jan 2014 00:26:09 GMT
Fixed conf/slaves and updated docs.


Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/740730a1
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/740730a1
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/740730a1

Branch: refs/heads/master
Commit: 740730a17901f914d0e9d470b8f40e30be33a9bb
Parents: 38d75e1
Author: Tathagata Das <tathagata.das1565@gmail.com>
Authored: Fri Jan 10 05:06:15 2014 -0800
Committer: Tathagata Das <tathagata.das1565@gmail.com>
Committed: Fri Jan 10 05:06:15 2014 -0800

----------------------------------------------------------------------
 conf/slaves                                        |  3 ++-
 .../org/apache/spark/util/TimeStampedHashMap.scala |  9 ++++++---
 .../spark/streaming/DStreamCheckpointData.scala    | 17 +++++++++++++----
 .../spark/streaming/dstream/FileInputDStream.scala |  3 ++-
 4 files changed, 23 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/740730a1/conf/slaves
----------------------------------------------------------------------
diff --git a/conf/slaves b/conf/slaves
index 2fbb50c..da0a013 100644
--- a/conf/slaves
+++ b/conf/slaves
@@ -1 +1,2 @@
-localhost
+# A Spark Worker will be started on each of the machines listed below.
+localhost
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/740730a1/core/src/main/scala/org/apache/spark/util/TimeStampedHashMap.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/TimeStampedHashMap.scala b/core/src/main/scala/org/apache/spark/util/TimeStampedHashMap.scala
index 9ce4ef7..dde504f 100644
--- a/core/src/main/scala/org/apache/spark/util/TimeStampedHashMap.scala
+++ b/core/src/main/scala/org/apache/spark/util/TimeStampedHashMap.scala
@@ -26,9 +26,12 @@ import org.apache.spark.Logging
 
 /**
  * This is a custom implementation of scala.collection.mutable.Map which stores the insertion
- * time stamp along with each key-value pair. Key-value pairs that are older than a particular
- * threshold time can them be removed using the clearOldValues method. This is intended to
be a drop-in
- * replacement of scala.collection.mutable.HashMap.
+ * timestamp along with each key-value pair. If specified, the timestamp of each pair can
be
+ * updated every it is accessed. Key-value pairs whose timestamp are older than a particular
+ * threshold time can them be removed using the clearOldValues method. This is intended to
+ * be a drop-in replacement of scala.collection.mutable.HashMap.
+ * @param updateTimeStampOnGet When enabled, the timestamp of a pair will be
+ *                             updated when it is accessed
  */
 class TimeStampedHashMap[A, B](updateTimeStampOnGet: Boolean = false)
   extends Map[A, B]() with Logging {

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/740730a1/streaming/src/main/scala/org/apache/spark/streaming/DStreamCheckpointData.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/DStreamCheckpointData.scala
b/streaming/src/main/scala/org/apache/spark/streaming/DStreamCheckpointData.scala
index 1589bc1..671f7bb 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/DStreamCheckpointData.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/DStreamCheckpointData.scala
@@ -34,8 +34,10 @@ class DStreamCheckpointData[T: ClassTag] (dstream: DStream[T])
 
   // Mapping of the batch time to the checkpointed RDD file of that time
   @transient private var timeToCheckpointFile = new HashMap[Time, String]
-  // Mapping of the batch time to the time of the oldest checkpointed RDD in that batch's
checkpoint data
+  // Mapping of the batch time to the time of the oldest checkpointed RDD
+  // in that batch's checkpoint data
   @transient private var timeToOldestCheckpointFileTime = new HashMap[Time, Time]
+
   @transient private var fileSystem : FileSystem = null
   protected[streaming] def currentCheckpointFiles = data.asInstanceOf[HashMap[Time, String]]
 
@@ -55,19 +57,26 @@ class DStreamCheckpointData[T: ClassTag] (dstream: DStream[T])
     if (!checkpointFiles.isEmpty) {
       currentCheckpointFiles.clear()
       currentCheckpointFiles ++= checkpointFiles
+      // Add the current checkpoint files to the map of all checkpoint files
+      // This will be used to delete old checkpoint files
       timeToCheckpointFile ++= currentCheckpointFiles
+      // Remember the time of the oldest checkpoint RDD in current state
       timeToOldestCheckpointFileTime(time) = currentCheckpointFiles.keys.min(Time.ordering)
     }
   }
 
   /**
-   * Cleanup old checkpoint data. This gets called every time the graph
-   * checkpoint is initiated, but after `update` is called. Default
-   * implementation, cleans up old checkpoint files.
+   * Cleanup old checkpoint data. This gets called after a checkpoint of `time` has been
+   * written to the checkpoint directory.
    */
   def cleanup(time: Time) {
+    // Get the time of the oldest checkpointed RDD that was written as part of the
+    // checkpoint of `time`
     timeToOldestCheckpointFileTime.remove(time) match {
       case Some(lastCheckpointFileTime) =>
+        // Find all the checkpointed RDDs (i.e. files) that are older than `lastCheckpointFileTime`
+        // This is because checkpointed RDDs older than this are not going to be needed
+        // even after master fails, as the checkpoint data of `time` does not refer to those
files
         val filesToDelete = timeToCheckpointFile.filter(_._1 < lastCheckpointFileTime)
         logDebug("Files to delete:\n" + filesToDelete.mkString(","))
         filesToDelete.foreach {

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/740730a1/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala
b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala
index 4585e3f..38aa119 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala
@@ -98,7 +98,7 @@ class FileInputDStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K,V]
: Clas
       (time - rememberDuration) + ": " + oldFiles.keys.mkString(", "))
     logDebug("Cleared files are:\n" +
       oldFiles.map(p => (p._1, p._2.mkString(", "))).mkString("\n"))
-    // Delete file times that weren't accessed in the last round of getting new files
+    // Delete file mod times that weren't accessed in the last round of getting new files
     fileModTimes.clearOldValues(lastNewFileFindingTime - 1)
   }
 
@@ -147,6 +147,7 @@ class FileInputDStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K,V]
: Clas
   }
 
   private def getFileModTime(path: Path) = {
+    // Get file mod time from cache or fetch it from the file system
     fileModTimes.getOrElseUpdate(path.toString, fs.getFileStatus(path).getModificationTime())
   }
 


Mime
View raw message