spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From pwend...@apache.org
Subject [14/20] git commit: Removed spark.hostPort and other setting from SparkConf before saving to checkpoint.
Date Sat, 11 Jan 2014 00:26:08 GMT
Removed spark.hostPort and other setting from SparkConf before saving to checkpoint.


Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/4f609f79
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/4f609f79
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/4f609f79

Branch: refs/heads/master
Commit: 4f609f79015732a91a83c5625d357c4edfc7c962
Parents: d7ec73a
Author: Tathagata Das <tathagata.das1565@gmail.com>
Authored: Fri Jan 10 12:58:07 2014 +0000
Committer: Tathagata Das <tathagata.das1565@gmail.com>
Committed: Fri Jan 10 12:58:07 2014 +0000

----------------------------------------------------------------------
 .../org/apache/spark/streaming/Checkpoint.scala | 24 +++++---------------
 1 file changed, 6 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/4f609f79/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
index 7366d8a..62b2253 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
@@ -43,6 +43,9 @@ class Checkpoint(@transient ssc: StreamingContext, val checkpointTime: Time)
   val pendingTimes = ssc.scheduler.getPendingTimes()
   val delaySeconds = MetadataCleaner.getDelaySeconds(ssc.conf)
   val sparkConf = ssc.conf
+  
+  // do not save these configurations
+  sparkConf.remove("spark.hostPort").remove("spark.driver.host").remove("spark.driver.port")
 
   def validate() {
     assert(master != null, "Checkpoint.master is null")
@@ -73,11 +76,7 @@ object Checkpoint extends Logging {
     def sortFunc(path1: Path, path2: Path): Boolean = {
       val (time1, bk1) = path1.getName match { case REGEX(x, y) => (x.toLong, !y.isEmpty)
}
       val (time2, bk2) = path2.getName match { case REGEX(x, y) => (x.toLong, !y.isEmpty)
}
-      logInfo("Path 1: " + path1 + " -> " + time1 + ", " + bk1)
-      logInfo("Path 2: " + path2 + " -> " + time2 + ", " + bk2)
-      val precede = (time1 < time2) || (time1 == time2 && bk1) 
-      logInfo(precede.toString)
-      precede
+      (time1 < time2) || (time1 == time2 && bk1) 
     }
 
     val path = new Path(checkpointDir)
@@ -85,12 +84,8 @@ object Checkpoint extends Logging {
       val statuses = fs.listStatus(path)
       if (statuses != null) {
         val paths = statuses.map(_.getPath)
-        logInfo("Paths = " + paths.map(_.getName).mkString(", "))
         val filtered = paths.filter(p => REGEX.findFirstIn(p.toString).nonEmpty)
-        logInfo("Filtered paths = " + filtered.map(_.getName).mkString(", "))
-        val sorted = filtered.sortWith(sortFunc)
-        logInfo("Sorted paths = " + sorted.map(_.getName).mkString(", "))
-        sorted
+        filtered.sortWith(sortFunc)
       } else {
         logWarning("Listing " + path + " returned null")
         Seq.empty
@@ -112,16 +107,9 @@ class CheckpointWriter(jobGenerator: JobGenerator, conf: SparkConf, checkpointDi
   val MAX_ATTEMPTS = 3
   val executor = Executors.newFixedThreadPool(1)
   val compressionCodec = CompressionCodec.createCodec(conf)
-  // The file to which we actually write - and then "move" to file
-  // val writeFile = new Path(file.getParent, file.getName + ".next")
-  // The file to which existing checkpoint is backed up (i.e. "moved")
-  // val bakFile = new Path(file.getParent, file.getName + ".bk")
-
   private var stopped = false
   private var fs_ : FileSystem = _
 
-  // Removed code which validates whether there is only one CheckpointWriter per path 'file'
since
-  // I did not notice any errors - reintroduce it ?
   class CheckpointWriteHandler(checkpointTime: Time, bytes: Array[Byte]) extends Runnable
{
     def run() {
       var attempts = 0
@@ -189,7 +177,7 @@ class CheckpointWriter(jobGenerator: JobGenerator, conf: SparkConf, checkpointDi
     bos.close()
     try {
       executor.execute(new CheckpointWriteHandler(checkpoint.checkpointTime, bos.toByteArray))
-      logInfo("Submitted checkpoint of time " + checkpoint.checkpointTime + " writer queue")
+      logDebug("Submitted checkpoint of time " + checkpoint.checkpointTime + " writer queue")
     } catch {
       case rej: RejectedExecutionException =>
         logError("Could not submit checkpoint task to the thread pool executor", rej)


Mime
View raw message