spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From pwend...@apache.org
Subject git commit: Merge pull request #88 from rxin/clean
Date Mon, 21 Oct 2013 18:59:29 GMT
Updated Branches:
  refs/heads/branch-0.8 1a50c79c3 -> 1c3f4bdb6


Merge pull request #88 from rxin/clean

Made the following traits/interfaces/classes non-public:

Made the following traits/interfaces/classes non-public:
SparkHadoopWriter
SparkHadoopMapRedUtil
SparkHadoopMapReduceUtil
SparkHadoopUtil
PythonAccumulatorParam
BlockManagerSlaveActor
(cherry picked from commit aa61bfd39962a3e3c8ba8e4bcf25f48212933cce)

Signed-off-by: Patrick Wendell <pwendell@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/1c3f4bdb
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/1c3f4bdb
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/1c3f4bdb

Branch: refs/heads/branch-0.8
Commit: 1c3f4bdb6dbdee4c04e9292d11ab852424993329
Parents: 1a50c79
Author: Patrick Wendell <pwendell@gmail.com>
Authored: Mon Oct 21 11:57:05 2013 -0700
Committer: Patrick Wendell <pwendell@gmail.com>
Committed: Mon Oct 21 11:58:22 2013 -0700

----------------------------------------------------------------------
 .../hadoop/mapred/SparkHadoopMapRedUtil.scala   |  17 ++-
 .../mapreduce/SparkHadoopMapReduceUtil.scala    |  33 +++--
 .../org/apache/spark/SparkHadoopWriter.scala    |  16 +--
 .../org/apache/spark/api/python/PythonRDD.scala |   2 +-
 .../apache/spark/deploy/SparkHadoopUtil.scala   |  19 +--
 .../org/apache/spark/scheduler/JobLogger.scala  | 126 +++++++++----------
 .../spark/storage/BlockManagerSlaveActor.scala  |   1 +
 7 files changed, 113 insertions(+), 101 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/1c3f4bdb/core/src/main/scala/org/apache/hadoop/mapred/SparkHadoopMapRedUtil.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/hadoop/mapred/SparkHadoopMapRedUtil.scala b/core/src/main/scala/org/apache/hadoop/mapred/SparkHadoopMapRedUtil.scala
index f874600..0c47afa 100644
--- a/core/src/main/scala/org/apache/hadoop/mapred/SparkHadoopMapRedUtil.scala
+++ b/core/src/main/scala/org/apache/hadoop/mapred/SparkHadoopMapRedUtil.scala
@@ -17,20 +17,29 @@
 
 package org.apache.hadoop.mapred
 
+private[apache]
 trait SparkHadoopMapRedUtil {
   def newJobContext(conf: JobConf, jobId: JobID): JobContext = {
-    val klass = firstAvailableClass("org.apache.hadoop.mapred.JobContextImpl", "org.apache.hadoop.mapred.JobContext");
-    val ctor = klass.getDeclaredConstructor(classOf[JobConf], classOf[org.apache.hadoop.mapreduce.JobID])
+    val klass = firstAvailableClass("org.apache.hadoop.mapred.JobContextImpl",
+      "org.apache.hadoop.mapred.JobContext")
+    val ctor = klass.getDeclaredConstructor(classOf[JobConf],
+      classOf[org.apache.hadoop.mapreduce.JobID])
     ctor.newInstance(conf, jobId).asInstanceOf[JobContext]
   }
 
   def newTaskAttemptContext(conf: JobConf, attemptId: TaskAttemptID): TaskAttemptContext
= {
-    val klass = firstAvailableClass("org.apache.hadoop.mapred.TaskAttemptContextImpl", "org.apache.hadoop.mapred.TaskAttemptContext")
+    val klass = firstAvailableClass("org.apache.hadoop.mapred.TaskAttemptContextImpl",
+      "org.apache.hadoop.mapred.TaskAttemptContext")
     val ctor = klass.getDeclaredConstructor(classOf[JobConf], classOf[TaskAttemptID])
     ctor.newInstance(conf, attemptId).asInstanceOf[TaskAttemptContext]
   }
 
-  def newTaskAttemptID(jtIdentifier: String, jobId: Int, isMap: Boolean, taskId: Int, attemptId:
Int) = {
+  def newTaskAttemptID(
+      jtIdentifier: String,
+      jobId: Int,
+      isMap: Boolean,
+      taskId: Int,
+      attemptId: Int) = {
     new TaskAttemptID(jtIdentifier, jobId, isMap, taskId, attemptId)
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/1c3f4bdb/core/src/main/scala/org/apache/hadoop/mapreduce/SparkHadoopMapReduceUtil.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/hadoop/mapreduce/SparkHadoopMapReduceUtil.scala
b/core/src/main/scala/org/apache/hadoop/mapreduce/SparkHadoopMapReduceUtil.scala
index 9318030..32429f0 100644
--- a/core/src/main/scala/org/apache/hadoop/mapreduce/SparkHadoopMapReduceUtil.scala
+++ b/core/src/main/scala/org/apache/hadoop/mapreduce/SparkHadoopMapReduceUtil.scala
@@ -17,9 +17,10 @@
 
 package org.apache.hadoop.mapreduce
 
-import org.apache.hadoop.conf.Configuration
 import java.lang.{Integer => JInteger, Boolean => JBoolean}
+import org.apache.hadoop.conf.Configuration
 
+private[apache]
 trait SparkHadoopMapReduceUtil {
   def newJobContext(conf: Configuration, jobId: JobID): JobContext = {
     val klass = firstAvailableClass(
@@ -37,23 +38,31 @@ trait SparkHadoopMapReduceUtil {
     ctor.newInstance(conf, attemptId).asInstanceOf[TaskAttemptContext]
   }
 
-  def newTaskAttemptID(jtIdentifier: String, jobId: Int, isMap: Boolean, taskId: Int, attemptId:
Int) = {
-    val klass = Class.forName("org.apache.hadoop.mapreduce.TaskAttemptID");
+  def newTaskAttemptID(
+      jtIdentifier: String,
+      jobId: Int,
+      isMap: Boolean,
+      taskId: Int,
+      attemptId: Int) = {
+    val klass = Class.forName("org.apache.hadoop.mapreduce.TaskAttemptID")
     try {
-      // first, attempt to use the old-style constructor that takes a boolean isMap (not
available in YARN)
+      // First, attempt to use the old-style constructor that takes a boolean isMap
+      // (not available in YARN)
       val ctor = klass.getDeclaredConstructor(classOf[String], classOf[Int], classOf[Boolean],
-          classOf[Int], classOf[Int])
-      ctor.newInstance(jtIdentifier, new JInteger(jobId), new JBoolean(isMap), new JInteger(taskId),
new
-        JInteger(attemptId)).asInstanceOf[TaskAttemptID]
+        classOf[Int], classOf[Int])
+      ctor.newInstance(jtIdentifier, new JInteger(jobId), new JBoolean(isMap), new JInteger(taskId),
+        new JInteger(attemptId)).asInstanceOf[TaskAttemptID]
     } catch {
       case exc: NoSuchMethodException => {
-        // failed, look for the new ctor that takes a TaskType (not available in 1.x)
-        val taskTypeClass = Class.forName("org.apache.hadoop.mapreduce.TaskType").asInstanceOf[Class[Enum[_]]]
-        val taskType = taskTypeClass.getMethod("valueOf", classOf[String]).invoke(taskTypeClass,
if(isMap) "MAP" else "REDUCE")
+        // If that failed, look for the new constructor that takes a TaskType (not available
in 1.x)
+        val taskTypeClass = Class.forName("org.apache.hadoop.mapreduce.TaskType")
+          .asInstanceOf[Class[Enum[_]]]
+        val taskType = taskTypeClass.getMethod("valueOf", classOf[String]).invoke(
+          taskTypeClass, if(isMap) "MAP" else "REDUCE")
         val ctor = klass.getDeclaredConstructor(classOf[String], classOf[Int], taskTypeClass,
           classOf[Int], classOf[Int])
-        ctor.newInstance(jtIdentifier, new JInteger(jobId), taskType, new JInteger(taskId),
new
-          JInteger(attemptId)).asInstanceOf[TaskAttemptID]
+        ctor.newInstance(jtIdentifier, new JInteger(jobId), taskType, new JInteger(taskId),
+          new JInteger(attemptId)).asInstanceOf[TaskAttemptID]
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/1c3f4bdb/core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala b/core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala
index afa76a4..103a1c2 100644
--- a/core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala
+++ b/core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala
@@ -17,14 +17,14 @@
 
 package org.apache.hadoop.mapred
 
-import org.apache.hadoop.fs.FileSystem
-import org.apache.hadoop.fs.Path
-
+import java.io.IOException
 import java.text.SimpleDateFormat
 import java.text.NumberFormat
-import java.io.IOException
 import java.util.Date
 
+import org.apache.hadoop.fs.FileSystem
+import org.apache.hadoop.fs.Path
+
 import org.apache.spark.Logging
 import org.apache.spark.SerializableWritable
 
@@ -36,6 +36,7 @@ import org.apache.spark.SerializableWritable
  * Saves the RDD using a JobConf, which should contain an output key class, an output value
class,
  * a filename to write to, etc, exactly like in a Hadoop MapReduce job.
  */
+private[apache]
 class SparkHadoopWriter(@transient jobConf: JobConf)
   extends Logging
   with SparkHadoopMapRedUtil
@@ -86,13 +87,11 @@ class SparkHadoopWriter(@transient jobConf: JobConf)
     }
 
     getOutputCommitter().setupTask(getTaskContext()) 
-    writer = getOutputFormat().getRecordWriter(
-        fs, conf.value, outputName, Reporter.NULL)
+    writer = getOutputFormat().getRecordWriter(fs, conf.value, outputName, Reporter.NULL)
   }
 
   def write(key: AnyRef, value: AnyRef) {
-    if (writer!=null) {
-      //println (">>> Writing ("+key.toString+": " + key.getClass.toString + ",
" + value.toString + ": " + value.getClass.toString + ")")
+    if (writer != null) {
       writer.write(key, value)
     } else {
       throw new IOException("Writer is null, open() has not been called")
@@ -182,6 +181,7 @@ class SparkHadoopWriter(@transient jobConf: JobConf)
   }
 }
 
+private[apache]
 object SparkHadoopWriter {
   def createJobID(time: Date, id: Int): JobID = {
     val formatter = new SimpleDateFormat("yyyyMMddHHmm")

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/1c3f4bdb/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
index 1f8ad68..12b4d94 100644
--- a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
+++ b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
@@ -308,7 +308,7 @@ private class BytesToString extends org.apache.spark.api.java.function.Function[
  * Internal class that acts as an `AccumulatorParam` for Python accumulators. Inside, it
  * collects a list of pickled strings that we pass to Python through a socket.
  */
-class PythonAccumulatorParam(@transient serverHost: String, serverPort: Int)
+private class PythonAccumulatorParam(@transient serverHost: String, serverPort: Int)
   extends AccumulatorParam[JList[Array[Byte]]] {
 
   Utils.checkHost(serverHost, "Expected hostname")

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/1c3f4bdb/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
index 993ba6b..83cd3df 100644
--- a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
@@ -17,26 +17,31 @@
 
 package org.apache.spark.deploy
 
-import com.google.common.collect.MapMaker
-
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.mapred.JobConf
 
+import com.google.common.collect.MapMaker
+
 
 /**
- * Contains util methods to interact with Hadoop from spark.
+ * Contains util methods to interact with Hadoop from Spark.
  */
+private[spark]
 class SparkHadoopUtil {
   // A general, soft-reference map for metadata needed during HadoopRDD split computation
   // (e.g., HadoopFileRDD uses this to cache JobConfs and InputFormats).
   private[spark] val hadoopJobMetadata = new MapMaker().softValues().makeMap[String, Any]()
 
-  // Return an appropriate (subclass) of Configuration. Creating config can initializes some
hadoop
-  // subsystems
+  /**
+   * Return an appropriate (subclass) of Configuration. Creating config can initializes some
Hadoop
+   * subsystems.
+   */
   def newConfiguration(): Configuration = new Configuration()
 
-  // Add any user credentials to the job conf which are necessary for running on a secure
Hadoop
-  // cluster
+  /**
+   * Add any user credentials to the job conf which are necessary for running on a secure
Hadoop
+   * cluster.
+   */
   def addCredentials(conf: JobConf) {}
 
   def isYarnMode(): Boolean = { false }

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/1c3f4bdb/core/src/main/scala/org/apache/spark/scheduler/JobLogger.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/JobLogger.scala b/core/src/main/scala/org/apache/spark/scheduler/JobLogger.scala
index 3628b1b..19c0251 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/JobLogger.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/JobLogger.scala
@@ -24,56 +24,54 @@ import java.text.SimpleDateFormat
 import java.util.{Date, Properties}
 import java.util.concurrent.LinkedBlockingQueue
 
-import scala.collection.mutable.{Map, HashMap, ListBuffer}
-import scala.io.Source
+import scala.collection.mutable.{HashMap, ListBuffer}
 
 import org.apache.spark._
 import org.apache.spark.rdd.RDD
 import org.apache.spark.executor.TaskMetrics
 
-// Used to record runtime information for each job, including RDD graph 
-// tasks' start/stop shuffle information and information from outside
-
+/**
+ * A logger class to record runtime information for jobs in Spark. This class outputs one
log file
+ * per Spark job with information such as RDD graph, tasks start/stop, shuffle information.
+ *
+ * @param logDirName The base directory for the log files.
+ */
 class JobLogger(val logDirName: String) extends SparkListener with Logging {
-  private val logDir =  
-    if (System.getenv("SPARK_LOG_DIR") != null)  
-      System.getenv("SPARK_LOG_DIR")
-    else 
-      "/tmp/spark"
+
+  private val logDir = Option(System.getenv("SPARK_LOG_DIR")).getOrElse("/tmp/spark")
+
   private val jobIDToPrintWriter = new HashMap[Int, PrintWriter] 
   private val stageIDToJobID = new HashMap[Int, Int]
   private val jobIDToStages = new HashMap[Int, ListBuffer[Stage]]
   private val DATE_FORMAT = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss")
   private val eventQueue = new LinkedBlockingQueue[SparkListenerEvents]
-  
+
   createLogDir()
   def this() = this(String.valueOf(System.currentTimeMillis()))
-  
-  def getLogDir = logDir
-  def getJobIDtoPrintWriter = jobIDToPrintWriter
-  def getStageIDToJobID = stageIDToJobID
-  def getJobIDToStages = jobIDToStages
-  def getEventQueue = eventQueue
-  
+
+  // The following 5 functions are used only in testing.
+  private[scheduler] def getLogDir = logDir
+  private[scheduler] def getJobIDtoPrintWriter = jobIDToPrintWriter
+  private[scheduler] def getStageIDToJobID = stageIDToJobID
+  private[scheduler] def getJobIDToStages = jobIDToStages
+  private[scheduler] def getEventQueue = eventQueue
+
   // Create a folder for log files, the folder's name is the creation time of the jobLogger
   protected def createLogDir() {
     val dir = new File(logDir + "/" + logDirName + "/")
-    if (dir.exists()) {
-      return
-    }
-    if (dir.mkdirs() == false) {
-      logError("create log directory error:" + logDir + "/" + logDirName + "/")
+    if (!dir.exists() && !dir.mkdirs()) {
+      logError("Error creating log directory: " + logDir + "/" + logDirName + "/")
     }
   }
   
   // Create a log file for one job, the file name is the jobID
   protected def createLogWriter(jobID: Int) {
-    try{
+    try {
       val fileWriter = new PrintWriter(logDir + "/" + logDirName + "/" + jobID)
       jobIDToPrintWriter += (jobID -> fileWriter)
-      } catch {
-        case e: FileNotFoundException => e.printStackTrace()
-      }
+    } catch {
+      case e: FileNotFoundException => e.printStackTrace()
+    }
   }
   
   // Close log file, and clean the stage relationship in stageIDToJobID 
@@ -118,10 +116,9 @@ class JobLogger(val logDirName: String) extends SparkListener with Logging
{
     def getRddsInStage(rdd: RDD[_]): ListBuffer[RDD[_]] = {
       var rddList = new ListBuffer[RDD[_]]
       rddList += rdd
-      rdd.dependencies.foreach{ dep => dep match {
-          case shufDep: ShuffleDependency[_,_] =>
-          case _ => rddList ++= getRddsInStage(dep.rdd)
-        }
+      rdd.dependencies.foreach {
+        case shufDep: ShuffleDependency[_, _] =>
+        case dep: Dependency[_] => rddList ++= getRddsInStage(dep.rdd)
       }
       rddList
     }
@@ -161,29 +158,27 @@ class JobLogger(val logDirName: String) extends SparkListener with Logging
{
   protected def recordRddInStageGraph(jobID: Int, rdd: RDD[_], indent: Int) {
     val rddInfo = "RDD_ID=" + rdd.id + "(" + getRddName(rdd) + "," + rdd.generator + ")"
     jobLogInfo(jobID, indentString(indent) + rddInfo, false)
-    rdd.dependencies.foreach{ dep => dep match {
-        case shufDep: ShuffleDependency[_,_] => 
-          val depInfo = "SHUFFLE_ID=" + shufDep.shuffleId
-          jobLogInfo(jobID, indentString(indent + 1) + depInfo, false)
-        case _ => recordRddInStageGraph(jobID, dep.rdd, indent + 1)
-      }
+    rdd.dependencies.foreach {
+      case shufDep: ShuffleDependency[_, _] =>
+        val depInfo = "SHUFFLE_ID=" + shufDep.shuffleId
+        jobLogInfo(jobID, indentString(indent + 1) + depInfo, false)
+      case dep: Dependency[_] => recordRddInStageGraph(jobID, dep.rdd, indent + 1)
     }
   }
   
   protected def recordStageDepGraph(jobID: Int, stage: Stage, indent: Int = 0) {
-    var stageInfo: String = ""
-    if (stage.isShuffleMap) {
-      stageInfo = "STAGE_ID=" + stage.id + " MAP_STAGE SHUFFLE_ID=" + 
-                  stage.shuffleDep.get.shuffleId
-    }else{
-      stageInfo = "STAGE_ID=" + stage.id + " RESULT_STAGE"
+    val stageInfo = if (stage.isShuffleMap) {
+      "STAGE_ID=" + stage.id + " MAP_STAGE SHUFFLE_ID=" + stage.shuffleDep.get.shuffleId
+    } else {
+      "STAGE_ID=" + stage.id + " RESULT_STAGE"
     }
     if (stage.jobId == jobID) {
       jobLogInfo(jobID, indentString(indent) + stageInfo, false)
       recordRddInStageGraph(jobID, stage.rdd, indent)
       stage.parents.foreach(recordStageDepGraph(jobID, _, indent + 2))
-    } else
+    } else {
       jobLogInfo(jobID, indentString(indent) + stageInfo + " JOB_ID=" + stage.jobId, false)
+    }
   }
   
   // Record task metrics into job log files
@@ -193,39 +188,32 @@ class JobLogger(val logDirName: String) extends SparkListener with Logging
{
                " START_TIME=" + taskInfo.launchTime + " FINISH_TIME=" + taskInfo.finishTime
+ 
                " EXECUTOR_ID=" + taskInfo.executorId +  " HOST=" + taskMetrics.hostname
     val executorRunTime = " EXECUTOR_RUN_TIME=" + taskMetrics.executorRunTime
-    val readMetrics = 
-      taskMetrics.shuffleReadMetrics match {
-        case Some(metrics) => 
-          " SHUFFLE_FINISH_TIME=" + metrics.shuffleFinishTime + 
-          " BLOCK_FETCHED_TOTAL=" + metrics.totalBlocksFetched + 
-          " BLOCK_FETCHED_LOCAL=" + metrics.localBlocksFetched + 
-          " BLOCK_FETCHED_REMOTE=" + metrics.remoteBlocksFetched + 
-          " REMOTE_FETCH_WAIT_TIME=" + metrics.fetchWaitTime + 
-          " REMOTE_FETCH_TIME=" + metrics.remoteFetchTime + 
-          " REMOTE_BYTES_READ=" + metrics.remoteBytesRead
-        case None => ""
-      }
-    val writeMetrics = 
-      taskMetrics.shuffleWriteMetrics match {
-        case Some(metrics) => 
-          " SHUFFLE_BYTES_WRITTEN=" + metrics.shuffleBytesWritten
-        case None => ""
-      }
+    val readMetrics = taskMetrics.shuffleReadMetrics match {
+      case Some(metrics) =>
+        " SHUFFLE_FINISH_TIME=" + metrics.shuffleFinishTime +
+        " BLOCK_FETCHED_TOTAL=" + metrics.totalBlocksFetched +
+        " BLOCK_FETCHED_LOCAL=" + metrics.localBlocksFetched +
+        " BLOCK_FETCHED_REMOTE=" + metrics.remoteBlocksFetched +
+        " REMOTE_FETCH_WAIT_TIME=" + metrics.fetchWaitTime +
+        " REMOTE_FETCH_TIME=" + metrics.remoteFetchTime +
+        " REMOTE_BYTES_READ=" + metrics.remoteBytesRead
+      case None => ""
+    }
+    val writeMetrics = taskMetrics.shuffleWriteMetrics match {
+      case Some(metrics) => " SHUFFLE_BYTES_WRITTEN=" + metrics.shuffleBytesWritten
+      case None => ""
+    }
     stageLogInfo(stageID, status + info + executorRunTime + readMetrics + writeMetrics)
   }
   
   override def onStageSubmitted(stageSubmitted: SparkListenerStageSubmitted) {
-    stageLogInfo(
-      stageSubmitted.stage.id,
-      "STAGE_ID=%d STATUS=SUBMITTED TASK_SIZE=%d".format(
-        stageSubmitted.stage.id, stageSubmitted.taskSize))
+    stageLogInfo(stageSubmitted.stage.id, "STAGE_ID=%d STATUS=SUBMITTED TASK_SIZE=%d".format(
+      stageSubmitted.stage.id, stageSubmitted.taskSize))
   }
   
   override def onStageCompleted(stageCompleted: StageCompleted) {
-    stageLogInfo(
-      stageCompleted.stageInfo.stage.id,
+    stageLogInfo(stageCompleted.stageInfo.stage.id,
       "STAGE_ID=%d STATUS=COMPLETED".format(stageCompleted.stageInfo.stage.id))
-    
   }
 
   override def onTaskStart(taskStart: SparkListenerTaskStart) { }

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/1c3f4bdb/core/src/main/scala/org/apache/spark/storage/BlockManagerSlaveActor.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerSlaveActor.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerSlaveActor.scala
index 9515030..3a65e55 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManagerSlaveActor.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerSlaveActor.scala
@@ -26,6 +26,7 @@ import org.apache.spark.storage.BlockManagerMessages._
  * An actor to take commands from the master to execute options. For example,
  * this is used to remove blocks from the slave's BlockManager.
  */
+private[storage]
 class BlockManagerSlaveActor(blockManager: BlockManager) extends Actor {
   override def receive = {
 


Mime
View raw message