spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From r...@apache.org
Subject [1/4] git commit: add javadoc to JobLogger, and some small format fix
Date Tue, 05 Nov 2013 02:21:34 GMT
Updated Branches:
  refs/heads/branch-0.8 7e00dee27 -> 518cf22eb


add javadoc to JobLogger, and some small format fix


Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/079820f7
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/079820f7
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/079820f7

Branch: refs/heads/branch-0.8
Commit: 079820f77e0bca1e5de8c0e9641f5fd8d815c7c9
Parents: 3db505c
Author: Mingfei <mingfei.shi@intel.com>
Authored: Fri Nov 1 15:32:27 2013 +0800
Committer: Mingfei <mingfei.shi@intel.com>
Committed: Fri Nov 1 15:32:27 2013 +0800

----------------------------------------------------------------------
 .../org/apache/spark/scheduler/JobLogger.scala  | 161 ++++++++++++++++---
 .../apache/spark/scheduler/JobLoggerSuite.scala |   2 +-
 2 files changed, 136 insertions(+), 27 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/079820f7/core/src/main/scala/org/apache/spark/scheduler/JobLogger.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/JobLogger.scala b/core/src/main/scala/org/apache/spark/scheduler/JobLogger.scala
index 12b0d74..0ae6a50 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/JobLogger.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/JobLogger.scala
@@ -24,16 +24,19 @@ import java.text.SimpleDateFormat
 import java.util.{Date, Properties}
 import java.util.concurrent.LinkedBlockingQueue
 
-import scala.collection.mutable.{HashMap, ListBuffer}
+import scala.collection.mutable.{HashMap, HashSet, ListBuffer}
 
 import org.apache.spark._
 import org.apache.spark.rdd.RDD
 import org.apache.spark.executor.TaskMetrics
+import org.apache.spark.storage.StorageLevel
 
 /**
- * A logger class to record runtime information for jobs in Spark. This class outputs one
log file
- * per Spark job with information such as RDD graph, tasks start/stop, shuffle information.
- *
+ * <p>A logger class to record runtime information for jobs in Spark. This class outputs
one log file
+ * for each Spark job, containing RDD graph, tasks start/stop, shuffle information. <br>
+ * JobLogger is a subclass of SparkListener, use addSparkListener to add JobLogger to a SparkContext

+ * after the SparkContext is created. <br>
+ * Note that each JobLogger only works for one SparkContext
  * @param logDirName The base directory for the log files.
  */
 class JobLogger(val logDirName: String) extends SparkListener with Logging {
@@ -56,7 +59,7 @@ class JobLogger(val logDirName: String) extends SparkListener with Logging
{
   private[scheduler] def getJobIDToStages = jobIDToStages
   private[scheduler] def getEventQueue = eventQueue
 
-  // Create a folder for log files, the folder's name is the creation time of the jobLogger
+  /** Create a folder for log files, the folder's name is the creation time of jobLogger
*/
   protected def createLogDir() {
     val dir = new File(logDir + "/" + logDirName + "/")
     if (!dir.exists() && !dir.mkdirs()) {
@@ -64,7 +67,12 @@ class JobLogger(val logDirName: String) extends SparkListener with Logging
{
     }
   }
   
-  // Create a log file for one job, the file name is the jobID
+  /** 
+   *  Create a log file for one job
+   *  @param jobID ID of the job
+   *  @return No return
+   *  @exception FileNotFoundException Fail to create log file
+   */
   protected def createLogWriter(jobID: Int) {
     try {
       val fileWriter = new PrintWriter(logDir + "/" + logDirName + "/" + jobID)
@@ -74,8 +82,12 @@ class JobLogger(val logDirName: String) extends SparkListener with Logging
{
     }
   }
   
-  // Close log file, and clean the stage relationship in stageIDToJobID 
-  protected def closeLogWriter(jobID: Int) = 
+  /** 
+   *  Close log file, and clean the stage relationship in stageIDToJobID
+   *  @param jobID ID of the job
+   *  @return No return
+   */
+  protected def closeLogWriter(jobID: Int) { 
     jobIDToPrintWriter.get(jobID).foreach { fileWriter => 
       fileWriter.close()
       jobIDToStages.get(jobID).foreach(_.foreach{ stage => 
@@ -84,9 +96,15 @@ class JobLogger(val logDirName: String) extends SparkListener with Logging
{
       jobIDToPrintWriter -= jobID
       jobIDToStages -= jobID
     }
+  }
   
-  // Write log information to log file, withTime parameter controls whether to recored 
-  // time stamp for the information
+  /** 
+   *  Write info into log file
+   *  @param jobID ID of the job
+   *  @param info Info to be recorded
+   *  @param withTime Controls whether to record time stamp before the info, default is true
+   *  @return No return
+   */
   protected def jobLogInfo(jobID: Int, info: String, withTime: Boolean = true) {
     var writeInfo = info
     if (withTime) {
@@ -96,9 +114,23 @@ class JobLogger(val logDirName: String) extends SparkListener with Logging
{
     jobIDToPrintWriter.get(jobID).foreach(_.println(writeInfo))
   }
   
-  protected def stageLogInfo(stageID: Int, info: String, withTime: Boolean = true) = 
+  /** 
+   *  Write info into log file
+   *  @param stageID ID of the stage
+   *  @param info Info to be recorded
+   *  @param withTime Controls whether to record time stamp before the info, default is true
+   *  @return No return
+   */  
+  protected def stageLogInfo(stageID: Int, info: String, withTime: Boolean = true) { 
     stageIDToJobID.get(stageID).foreach(jobID => jobLogInfo(jobID, info, withTime))
-
+  }
+  
+  /** 
+   *  Build stage dependency for a job
+   *  @param jobID ID of the job
+   *  @param stage Root stage of the job
+   *  @return No return
+   */  
   protected def buildJobDep(jobID: Int, stage: Stage) {
     if (stage.jobId == jobID) {
       jobIDToStages.get(jobID) match {
@@ -112,6 +144,11 @@ class JobLogger(val logDirName: String) extends SparkListener with Logging
{
     }
   }
 
+  /** 
+   *  Record stage dependency and RDD dependency for a stage
+   *  @param jobID Job ID of the stage
+   *  @return No return
+   */   
   protected def recordStageDep(jobID: Int) {
     def getRddsInStage(rdd: RDD[_]): ListBuffer[RDD[_]] = {
       var rddList = new ListBuffer[RDD[_]]
@@ -138,8 +175,12 @@ class JobLogger(val logDirName: String) extends SparkListener with Logging
{
     }
   }
   
-  // Generate indents and convert to String
-  protected def indentString(indent: Int) = {
+  /** 
+   *  Generate indents and convert to String
+   *  @param indent Number of indents
+   *  @return string of indents
+   */
+  protected def indentString(indent: Int): String = {
     val sb = new StringBuilder()
     for (i <- 1 to indent) {
       sb.append(" ")
@@ -147,16 +188,35 @@ class JobLogger(val logDirName: String) extends SparkListener with Logging
{
     sb.toString()
   }
   
-  protected def getRddName(rdd: RDD[_]) = {
-    var rddName = rdd.getClass.getName
+  /** 
+   *  Get RDD's name
+   *  @param rdd Input RDD
+   *  @return String of RDD's name
+   */
+  protected def getRddName(rdd: RDD[_]): String = {
+    var rddName = rdd.getClass.getSimpleName
     if (rdd.name != null) {
       rddName = rdd.name 
     }
     rddName
   }
   
+  /** 
+   *  Record RDD dependency graph in a stage  
+   *  @param jobID Job ID of the stage
+   *  @param rdd Root RDD of the stage
+   *  @param indent Indent number before info
+   *  @return No return
+   */
   protected def recordRddInStageGraph(jobID: Int, rdd: RDD[_], indent: Int) {
-    val rddInfo = "RDD_ID=" + rdd.id + "(" + getRddName(rdd) + "," + rdd.generator + ")"
+    val rddInfo = 
+      if (rdd.getStorageLevel != StorageLevel.NONE) {
+        "RDD_ID=" + rdd.id + " " + getRddName(rdd) + " CACHED" + " " +
+                rdd.origin + " " + rdd.generator
+      } else {
+        "RDD_ID=" + rdd.id + " " + getRddName(rdd) + " NONE" + " " +
+                rdd.origin + " " + rdd.generator
+      }
     jobLogInfo(jobID, indentString(indent) + rddInfo, false)
     rdd.dependencies.foreach {
       case shufDep: ShuffleDependency[_, _] =>
@@ -166,7 +226,15 @@ class JobLogger(val logDirName: String) extends SparkListener with Logging
{
     }
   }
   
-  protected def recordStageDepGraph(jobID: Int, stage: Stage, indent: Int = 0) {
+  /** 
+   *  Record stage dependency graph of a job
+   *  @param jobID Job ID of the stage
+   *  @param stage Root stage of the job
+   *  @param indent Indent number before info, default is 0
+   *  @return No return
+   */  
+  
+  protected def recordStageDepGraph(jobID: Int, stage: Stage, idSet: HashSet[Int], indent:
Int = 0) {
     val stageInfo = if (stage.isShuffleMap) {
       "STAGE_ID=" + stage.id + " MAP_STAGE SHUFFLE_ID=" + stage.shuffleDep.get.shuffleId
     } else {
@@ -174,14 +242,24 @@ class JobLogger(val logDirName: String) extends SparkListener with Logging
{
     }
     if (stage.jobId == jobID) {
       jobLogInfo(jobID, indentString(indent) + stageInfo, false)
-      recordRddInStageGraph(jobID, stage.rdd, indent)
-      stage.parents.foreach(recordStageDepGraph(jobID, _, indent + 2))
+      if (!idSet.contains(stage.id)) {
+        idSet += stage.id
+        recordRddInStageGraph(jobID, stage.rdd, indent)
+        stage.parents.foreach(recordStageDepGraph(jobID, _, idSet, indent + 2))
+      }
     } else {
       jobLogInfo(jobID, indentString(indent) + stageInfo + " JOB_ID=" + stage.jobId, false)
     }
   }
   
-  // Record task metrics into job log files
+  /** 
+   *  Record task metrics into job log files, including execution info and shuffle metrics

+   *  @param stageID Stage ID of the task
+   *  @param status Status info of the task
+   *  @param taskInfo Task description info
+   *  @param taskMetrics Task running metrics
+   *  @return No return
+   */
   protected def recordTaskMetrics(stageID: Int, status: String, 
                                 taskInfo: TaskInfo, taskMetrics: TaskMetrics) {
     val info = " TID=" + taskInfo.taskId + " STAGE_ID=" + stageID + 
@@ -206,18 +284,33 @@ class JobLogger(val logDirName: String) extends SparkListener with Logging
{
     stageLogInfo(stageID, status + info + executorRunTime + readMetrics + writeMetrics)
   }
   
+  /** 
+   *  When stage is submitted, record stage submit info
+   *  @param stageSubmitted Stage submitted event
+   *  @return No return
+   */
   override def onStageSubmitted(stageSubmitted: SparkListenerStageSubmitted) {
     stageLogInfo(stageSubmitted.stage.stageId,"STAGE_ID=%d STATUS=SUBMITTED TASK_SIZE=%d".format(
         stageSubmitted.stage.stageId, stageSubmitted.stage.numTasks))
   }
   
+  /** 
+   *  When stage is completed, record stage completion status
+   *  @param stageCompleted Stage completed event
+   *  @return No return
+   */
   override def onStageCompleted(stageCompleted: StageCompleted) {
     stageLogInfo(stageCompleted.stage.stageId, "STAGE_ID=%d STATUS=COMPLETED".format(
         stageCompleted.stage.stageId))
   }
-
+  
   override def onTaskStart(taskStart: SparkListenerTaskStart) { }
-
+  
+  /** 
+   *  When task ends, record task completion status and metrics
+   *  @param taskEnd Task end event
+   *  @return No return
+   */
   override def onTaskEnd(taskEnd: SparkListenerTaskEnd) {
     val task = taskEnd.task
     val taskInfo = taskEnd.taskInfo
@@ -246,6 +339,11 @@ class JobLogger(val logDirName: String) extends SparkListener with Logging
{
     }
   }
   
+  /** 
+   *  When job ends, recording job completion status and close log file
+   *  @param jobEnd Job end event
+   *  @return No return
+   */
   override def onJobEnd(jobEnd: SparkListenerJobEnd) {
     val job = jobEnd.job
     var info = "JOB_ID=" + job.jobId
@@ -259,14 +357,25 @@ class JobLogger(val logDirName: String) extends SparkListener with Logging
{
     jobLogInfo(job.jobId, info.substring(0, info.length - 1).toUpperCase)
     closeLogWriter(job.jobId)
   }
-
+  
+  /** 
+   *  Record job properties into job log file
+   *  @param jobID ID of the job
+   *  @param properties Properties of the job
+   *  @return No return
+   */
   protected def recordJobProperties(jobID: Int, properties: Properties) {
     if(properties != null) {
       val description = properties.getProperty(SparkContext.SPARK_JOB_DESCRIPTION, "")
       jobLogInfo(jobID, description, false)
     }
   }
-
+  
+  /** 
+   *  When job starts, record job property and stage graph
+   *  @param jobStart Job start event
+   *  @return No return
+   */
   override def onJobStart(jobStart: SparkListenerJobStart) {
     val job = jobStart.job
     val properties = jobStart.properties
@@ -274,7 +383,7 @@ class JobLogger(val logDirName: String) extends SparkListener with Logging
{
     recordJobProperties(job.jobId, properties)
     buildJobDep(job.jobId, job.finalStage)
     recordStageDep(job.jobId)
-    recordStageDepGraph(job.jobId, job.finalStage)
+    recordStageDepGraph(job.jobId, job.finalStage, new HashSet[Int])
     jobLogInfo(job.jobId, "JOB_ID=" + job.jobId + " STATUS=STARTED")
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/079820f7/core/src/test/scala/org/apache/spark/scheduler/JobLoggerSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/scheduler/JobLoggerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/JobLoggerSuite.scala
index 8406093..7d7ca9b 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/JobLoggerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/JobLoggerSuite.scala
@@ -65,7 +65,7 @@ class JobLoggerSuite extends FunSuite with LocalSparkContext with ShouldMatchers
     val rootStageInfo = new StageInfo(rootStage)
 
     joblogger.onStageSubmitted(SparkListenerStageSubmitted(rootStageInfo, null))
-    joblogger.getRddNameTest(parentRdd) should be (parentRdd.getClass.getName)
+    joblogger.getRddNameTest(parentRdd) should be (parentRdd.getClass.getSimpleName)
     parentRdd.setName("MyRDD")
     joblogger.getRddNameTest(parentRdd) should be ("MyRDD")
     joblogger.createLogWriterTest(jobID)


Mime
View raw message