spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From pwend...@apache.org
Subject [2/3] Merge pull request #557 from ScrapCodes/style. Closes #557.
Date Sun, 09 Feb 2014 18:09:30 GMT
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/b69f8b2a/core/src/main/scala/org/apache/spark/rdd/ParallelCollectionRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/ParallelCollectionRDD.scala b/core/src/main/scala/org/apache/spark/rdd/ParallelCollectionRDD.scala
index 09d0a81..56c7777 100644
--- a/core/src/main/scala/org/apache/spark/rdd/ParallelCollectionRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/ParallelCollectionRDD.scala
@@ -39,7 +39,8 @@ private[spark] class ParallelCollectionPartition[T: ClassTag](
   override def hashCode(): Int = (41 * (41 + rddId) + slice).toInt
 
   override def equals(other: Any): Boolean = other match {
-    case that: ParallelCollectionPartition[_] => (this.rddId == that.rddId && this.slice == that.slice)
+    case that: ParallelCollectionPartition[_] => (this.rddId == that.rddId &&
+            this.slice == that.slice)
     case _ => false
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/b69f8b2a/core/src/main/scala/org/apache/spark/rdd/PartitionerAwareUnionRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/PartitionerAwareUnionRDD.scala b/core/src/main/scala/org/apache/spark/rdd/PartitionerAwareUnionRDD.scala
index 4c625d0..f436432 100644
--- a/core/src/main/scala/org/apache/spark/rdd/PartitionerAwareUnionRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/PartitionerAwareUnionRDD.scala
@@ -23,8 +23,8 @@ import org.apache.spark.{TaskContext, OneToOneDependency, SparkContext, Partitio
 
 
 /**
- * Class representing partitions of PartitionerAwareUnionRDD, which maintains the list of corresponding partitions
- * of parent RDDs.
+ * Class representing partitions of PartitionerAwareUnionRDD, which maintains the list of
+ * corresponding partitions of parent RDDs.
  */
 private[spark]
 class PartitionerAwareUnionRDDPartition(

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/b69f8b2a/core/src/main/scala/org/apache/spark/rdd/RDDCheckpointData.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/RDDCheckpointData.scala b/core/src/main/scala/org/apache/spark/rdd/RDDCheckpointData.scala
index bc68811..73e8769 100644
--- a/core/src/main/scala/org/apache/spark/rdd/RDDCheckpointData.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/RDDCheckpointData.scala
@@ -35,10 +35,10 @@ private[spark] object CheckpointState extends Enumeration {
 }
 
 /**
- * This class contains all the information related to RDD checkpointing. Each instance of this class
- * is associated with a RDD. It manages process of checkpointing of the associated RDD, as well as,
- * manages the post-checkpoint state by providing the updated partitions, iterator and preferred locations
- * of the checkpointed RDD.
+ * This class contains all the information related to RDD checkpointing. Each instance of this
+ * class is associated with a RDD. It manages process of checkpointing of the associated RDD,
+ * as well as, manages the post-checkpoint state by providing the updated partitions,
+ * iterator and preferred locations of the checkpointed RDD.
  */
 private[spark] class RDDCheckpointData[T: ClassTag](@transient rdd: RDD[T])
   extends Logging with Serializable {
@@ -97,7 +97,7 @@ private[spark] class RDDCheckpointData[T: ClassTag](@transient rdd: RDD[T])
     val newRDD = new CheckpointRDD[T](rdd.context, path.toString)
     if (newRDD.partitions.size != rdd.partitions.size) {
       throw new SparkException(
-        "Checkpoint RDD " + newRDD + "("+ newRDD.partitions.size + ") has different " +
+        "Checkpoint RDD " + newRDD + "(" + newRDD.partitions.size + ") has different " +
           "number of partitions than original RDD " + rdd + "(" + rdd.partitions.size + ")")
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/b69f8b2a/core/src/main/scala/org/apache/spark/rdd/SequenceFileRDDFunctions.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/SequenceFileRDDFunctions.scala b/core/src/main/scala/org/apache/spark/rdd/SequenceFileRDDFunctions.scala
index 2d1bd5b..c9b4c76 100644
--- a/core/src/main/scala/org/apache/spark/rdd/SequenceFileRDDFunctions.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/SequenceFileRDDFunctions.scala
@@ -71,7 +71,8 @@ class SequenceFileRDDFunctions[K <% Writable: ClassTag, V <% Writable : ClassTag
     val convertKey = !classOf[Writable].isAssignableFrom(self.getKeyClass)
     val convertValue = !classOf[Writable].isAssignableFrom(self.getValueClass)
 
-    logInfo("Saving as sequence file of type (" + keyClass.getSimpleName + "," + valueClass.getSimpleName + ")" )
+    logInfo("Saving as sequence file of type (" + keyClass.getSimpleName + "," +
+      valueClass.getSimpleName + ")" )
     val format = classOf[SequenceFileOutputFormat[Writable, Writable]]
     val jobConf = new JobConf(self.context.hadoopConfiguration)
     if (!convertKey && !convertValue) {

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/b69f8b2a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
index 21d16fa..8021154 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@ -1082,8 +1082,9 @@ class DAGScheduler(
       case n: NarrowDependency[_] =>
         for (inPart <- n.getParents(partition)) {
           val locs = getPreferredLocs(n.rdd, inPart)
-          if (locs != Nil)
+          if (locs != Nil) {
             return locs
+          }
         }
       case _ =>
     }

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/b69f8b2a/core/src/main/scala/org/apache/spark/scheduler/InputFormatInfo.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/InputFormatInfo.scala b/core/src/main/scala/org/apache/spark/scheduler/InputFormatInfo.scala
index cc10cc0..23447f1 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/InputFormatInfo.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/InputFormatInfo.scala
@@ -33,7 +33,7 @@ import scala.collection.JavaConversions._
  * Parses and holds information about inputFormat (and files) specified as a parameter.
  */
 class InputFormatInfo(val configuration: Configuration, val inputFormatClazz: Class[_],
-                      val path: String) extends Logging {
+    val path: String) extends Logging {
 
   var mapreduceInputFormat: Boolean = false
   var mapredInputFormat: Boolean = false
@@ -41,7 +41,8 @@ class InputFormatInfo(val configuration: Configuration, val inputFormatClazz: Cl
   validate()
 
   override def toString: String = {
-    "InputFormatInfo " + super.toString + " .. inputFormatClazz " + inputFormatClazz + ", path : " + path
+    "InputFormatInfo " + super.toString + " .. inputFormatClazz " + inputFormatClazz + ", " +
+      "path : " + path
   }
 
   override def hashCode(): Int = {
@@ -50,8 +51,8 @@ class InputFormatInfo(val configuration: Configuration, val inputFormatClazz: Cl
     hashCode
   }
 
-  // Since we are not doing canonicalization of path, this can be wrong : like relative vs absolute path
-  // .. which is fine, this is best case effort to remove duplicates - right ?
+  // Since we are not doing canonicalization of path, this can be wrong : like relative vs
+  // absolute path .. which is fine, this is best case effort to remove duplicates - right ?
   override def equals(other: Any): Boolean = other match {
     case that: InputFormatInfo => {
       // not checking config - that should be fine, right ?
@@ -65,22 +66,26 @@ class InputFormatInfo(val configuration: Configuration, val inputFormatClazz: Cl
     logDebug("validate InputFormatInfo : " + inputFormatClazz + ", path  " + path)
 
     try {
-      if (classOf[org.apache.hadoop.mapreduce.InputFormat[_, _]].isAssignableFrom(inputFormatClazz)) {
+      if (classOf[org.apache.hadoop.mapreduce.InputFormat[_, _]].isAssignableFrom(
+        inputFormatClazz)) {
         logDebug("inputformat is from mapreduce package")
         mapreduceInputFormat = true
       }
-      else if (classOf[org.apache.hadoop.mapred.InputFormat[_, _]].isAssignableFrom(inputFormatClazz)) {
+      else if (classOf[org.apache.hadoop.mapred.InputFormat[_, _]].isAssignableFrom(
+        inputFormatClazz)) {
         logDebug("inputformat is from mapred package")
         mapredInputFormat = true
       }
       else {
         throw new IllegalArgumentException("Specified inputformat " + inputFormatClazz +
-          " is NOT a supported input format ? does not implement either of the supported hadoop api's")
+          " is NOT a supported input format ? does not implement either of the supported hadoop " +
+            "api's")
       }
     }
     catch {
       case e: ClassNotFoundException => {
-        throw new IllegalArgumentException("Specified inputformat " + inputFormatClazz + " cannot be found ?", e)
+        throw new IllegalArgumentException("Specified inputformat " + inputFormatClazz +
+          " cannot be found ?", e)
       }
     }
   }
@@ -125,8 +130,8 @@ class InputFormatInfo(val configuration: Configuration, val inputFormatClazz: Cl
    }
 
   private def findPreferredLocations(): Set[SplitInfo] = {
-    logDebug("mapreduceInputFormat : " + mapreduceInputFormat + ", mapredInputFormat : " + mapredInputFormat +
-      ", inputFormatClazz : " + inputFormatClazz)
+    logDebug("mapreduceInputFormat : " + mapreduceInputFormat + ", mapredInputFormat : " +
+      mapredInputFormat + ", inputFormatClazz : " + inputFormatClazz)
     if (mapreduceInputFormat) {
       prefLocsFromMapreduceInputFormat()
     }
@@ -150,8 +155,8 @@ object InputFormatInfo {
     c) Compute rack info for each host and update rack -> count map based on (b).
     d) Allocate nodes based on (c)
     e) On the allocation result, ensure that we dont allocate "too many" jobs on a single node
-       (even if data locality on that is very high) : this is to prevent fragility of job if a single
-       (or small set of) hosts go down.
+       (even if data locality on that is very high) : this is to prevent fragility of job if a
+       single (or small set of) hosts go down.
 
     go to (a) until required nodes are allocated.
 
@@ -159,7 +164,8 @@ object InputFormatInfo {
 
     PS: I know the wording here is weird, hopefully it makes some sense !
   */
-  def computePreferredLocations(formats: Seq[InputFormatInfo]): HashMap[String, HashSet[SplitInfo]] = {
+  def computePreferredLocations(formats: Seq[InputFormatInfo]): HashMap[String, HashSet[SplitInfo]]
+  = {
 
     val nodeToSplit = new HashMap[String, HashSet[SplitInfo]]
     for (inputSplit <- formats) {

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/b69f8b2a/core/src/main/scala/org/apache/spark/scheduler/JobLogger.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/JobLogger.scala b/core/src/main/scala/org/apache/spark/scheduler/JobLogger.scala
index f8fa5a9..b909b66 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/JobLogger.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/JobLogger.scala
@@ -45,10 +45,11 @@ class JobLogger(val user: String, val logDirName: String)
     String.valueOf(System.currentTimeMillis()))
 
   private val logDir =
-    if (System.getenv("SPARK_LOG_DIR") != null)
+    if (System.getenv("SPARK_LOG_DIR") != null) {
       System.getenv("SPARK_LOG_DIR")
-    else
+    } else {
       "/tmp/spark-%s".format(user)
+    }
 
   private val jobIDToPrintWriter = new HashMap[Int, PrintWriter]
   private val stageIDToJobID = new HashMap[Int, Int]
@@ -116,7 +117,7 @@ class JobLogger(val user: String, val logDirName: String)
     var writeInfo = info
     if (withTime) {
       val date = new Date(System.currentTimeMillis())
-      writeInfo = DATE_FORMAT.format(date) + ": " +info
+      writeInfo = DATE_FORMAT.format(date) + ": " + info
     }
     jobIDToPrintWriter.get(jobID).foreach(_.println(writeInfo))
   }
@@ -235,7 +236,8 @@ class JobLogger(val user: String, val logDirName: String)
    * @param stage Root stage of the job
    * @param indent Indent number before info, default is 0
    */
-  protected def recordStageDepGraph(jobID: Int, stage: Stage, idSet: HashSet[Int], indent: Int = 0) {
+  protected def recordStageDepGraph(jobID: Int, stage: Stage, idSet: HashSet[Int], indent: Int = 0)
+  {
     val stageInfo = if (stage.isShuffleMap) {
       "STAGE_ID=" + stage.id + " MAP_STAGE SHUFFLE_ID=" + stage.shuffleDep.get.shuffleId
     } else {

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/b69f8b2a/core/src/main/scala/org/apache/spark/scheduler/JobResult.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/JobResult.scala b/core/src/main/scala/org/apache/spark/scheduler/JobResult.scala
index c381348..d94f6ad 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/JobResult.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/JobResult.scala
@@ -23,4 +23,5 @@ package org.apache.spark.scheduler
 private[spark] sealed trait JobResult
 
 private[spark] case object JobSucceeded extends JobResult
-private[spark] case class JobFailed(exception: Exception, failedStage: Option[Stage]) extends JobResult
+private[spark] case class JobFailed(exception: Exception, failedStage: Option[Stage])
+  extends JobResult

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/b69f8b2a/core/src/main/scala/org/apache/spark/scheduler/ResultTask.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/ResultTask.scala b/core/src/main/scala/org/apache/spark/scheduler/ResultTask.scala
index 28f3ba5..0544f81 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/ResultTask.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/ResultTask.scala
@@ -36,7 +36,8 @@ private[spark] object ResultTask {
   val metadataCleaner = new MetadataCleaner(
     MetadataCleanerType.RESULT_TASK, serializedInfoCache.clearOldValues, new SparkConf)
 
-  def serializeInfo(stageId: Int, rdd: RDD[_], func: (TaskContext, Iterator[_]) => _): Array[Byte] = {
+  def serializeInfo(stageId: Int, rdd: RDD[_], func: (TaskContext, Iterator[_]) => _)
+  : Array[Byte] = {
     synchronized {
       val old = serializedInfoCache.get(stageId).orNull
       if (old != null) {
@@ -55,7 +56,8 @@ private[spark] object ResultTask {
     }
   }
 
-  def deserializeInfo(stageId: Int, bytes: Array[Byte]): (RDD[_], (TaskContext, Iterator[_]) => _) = {
+  def deserializeInfo(stageId: Int, bytes: Array[Byte])
+  : (RDD[_], (TaskContext, Iterator[_]) => _) = {
     val loader = Thread.currentThread.getContextClassLoader
     val in = new GZIPInputStream(new ByteArrayInputStream(bytes))
     val ser = SparkEnv.get.closureSerializer.newInstance()

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/b69f8b2a/core/src/main/scala/org/apache/spark/scheduler/SchedulableBuilder.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/SchedulableBuilder.scala b/core/src/main/scala/org/apache/spark/scheduler/SchedulableBuilder.scala
index 3cf995e..a546193 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/SchedulableBuilder.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/SchedulableBuilder.scala
@@ -148,6 +148,6 @@ private[spark] class FairSchedulableBuilder(val rootPool: Pool, conf: SparkConf)
       }
     }
     parentPool.addSchedulable(manager)
-    logInfo("Added task set " + manager.name + " tasks to pool "+poolName)
+    logInfo("Added task set " + manager.name + " tasks to pool " + poolName)
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/b69f8b2a/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala b/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala
index d8e97c3..d25f0a6 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala
@@ -37,8 +37,8 @@ case class SparkListenerTaskGettingResult(
 case class SparkListenerTaskEnd(task: Task[_], reason: TaskEndReason, taskInfo: TaskInfo,
      taskMetrics: TaskMetrics) extends SparkListenerEvents
 
-case class SparkListenerJobStart(job: ActiveJob, stageIds: Array[Int], properties: Properties = null)
-     extends SparkListenerEvents
+case class SparkListenerJobStart(job: ActiveJob, stageIds: Array[Int],
+    properties: Properties = null) extends SparkListenerEvents
 
 case class SparkListenerJobEnd(job: ActiveJob, jobResult: JobResult)
      extends SparkListenerEvents
@@ -99,11 +99,14 @@ class StatsReportListener extends SparkListener with Logging {
     showMillisDistribution("task runtime:", (info, _) => Some(info.duration))
 
     //shuffle write
-    showBytesDistribution("shuffle bytes written:",(_,metric) => metric.shuffleWriteMetrics.map{_.shuffleBytesWritten})
+    showBytesDistribution("shuffle bytes written:",
+      (_,metric) => metric.shuffleWriteMetrics.map{_.shuffleBytesWritten})
 
     //fetch & io
-    showMillisDistribution("fetch wait time:",(_, metric) => metric.shuffleReadMetrics.map{_.fetchWaitTime})
-    showBytesDistribution("remote bytes read:", (_, metric) => metric.shuffleReadMetrics.map{_.remoteBytesRead})
+    showMillisDistribution("fetch wait time:",
+      (_, metric) => metric.shuffleReadMetrics.map{_.fetchWaitTime})
+    showBytesDistribution("remote bytes read:",
+      (_, metric) => metric.shuffleReadMetrics.map{_.remoteBytesRead})
     showBytesDistribution("task result size:", (_, metric) => Some(metric.resultSize))
 
     //runtime breakdown
@@ -111,8 +114,10 @@ class StatsReportListener extends SparkListener with Logging {
     val runtimePcts = stageCompleted.stage.taskInfos.map{
       case (info, metrics) => RuntimePercentage(info.duration, metrics)
     }
-    showDistribution("executor (non-fetch) time pct: ", Distribution(runtimePcts.map{_.executorPct * 100}), "%2.0f %%")
-    showDistribution("fetch wait time pct: ", Distribution(runtimePcts.flatMap{_.fetchPct.map{_ * 100}}), "%2.0f %%")
+    showDistribution("executor (non-fetch) time pct: ",
+      Distribution(runtimePcts.map{_.executorPct * 100}), "%2.0f %%")
+    showDistribution("fetch wait time pct: ",
+      Distribution(runtimePcts.flatMap{_.fetchPct.map{_ * 100}}), "%2.0f %%")
     showDistribution("other time pct: ", Distribution(runtimePcts.map{_.other * 100}), "%2.0f %%")
   }
 
@@ -147,7 +152,8 @@ private[spark] object StatsReportListener extends Logging {
     logInfo("\t" + quantiles.mkString("\t"))
   }
 
-  def showDistribution(heading: String, dOpt: Option[Distribution], formatNumber: Double => String) {
+  def showDistribution(heading: String,
+      dOpt: Option[Distribution], formatNumber: Double => String) {
     dOpt.foreach { d => showDistribution(heading, d, formatNumber)}
   }
 
@@ -156,7 +162,8 @@ private[spark] object StatsReportListener extends Logging {
     showDistribution(heading, dOpt, f _)
   }
 
-  def showDistribution(heading:String, format: String, getMetric: (TaskInfo,TaskMetrics) => Option[Double])
+  def showDistribution(heading:String, format: String,
+      getMetric: (TaskInfo,TaskMetrics) => Option[Double])
     (implicit stage: SparkListenerStageCompleted) {
     showDistribution(heading, extractDoubleDistribution(stage, getMetric), format)
   }
@@ -175,7 +182,8 @@ private[spark] object StatsReportListener extends Logging {
   }
 
   def showMillisDistribution(heading: String, dOpt: Option[Distribution]) {
-    showDistribution(heading, dOpt, (d => StatsReportListener.millisToString(d.toLong)): Double => String)
+    showDistribution(heading, dOpt,
+      (d => StatsReportListener.millisToString(d.toLong)): Double => String)
   }
 
   def showMillisDistribution(heading: String, getMetric: (TaskInfo, TaskMetrics) => Option[Long])
@@ -212,7 +220,7 @@ private object RuntimePercentage {
     val denom = totalTime.toDouble
     val fetchTime = metrics.shuffleReadMetrics.map{_.fetchWaitTime}
     val fetch = fetchTime.map{_ / denom}
-    val exec = (metrics.executorRunTime - fetchTime.getOrElse(0l)) / denom
+    val exec = (metrics.executorRunTime - fetchTime.getOrElse(0L)) / denom
     val other = 1.0 - (exec + fetch.getOrElse(0d))
     RuntimePercentage(exec, fetch, other)
   }

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/b69f8b2a/core/src/main/scala/org/apache/spark/scheduler/Stage.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/Stage.scala b/core/src/main/scala/org/apache/spark/scheduler/Stage.scala
index 520c0b2..a78b018 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/Stage.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/Stage.scala
@@ -63,8 +63,9 @@ private[spark] class Stage(
   def addOutputLoc(partition: Int, status: MapStatus) {
     val prevList = outputLocs(partition)
     outputLocs(partition) = status :: prevList
-    if (prevList == Nil)
+    if (prevList == Nil) {
       numAvailableOutputs += 1
+    }
   }
 
   def removeOutputLoc(partition: Int, bmAddress: BlockManagerId) {

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/b69f8b2a/core/src/main/scala/org/apache/spark/scheduler/StageInfo.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/StageInfo.scala b/core/src/main/scala/org/apache/spark/scheduler/StageInfo.scala
index c4d1ad5..8f320e5 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/StageInfo.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/StageInfo.scala
@@ -29,7 +29,8 @@ import org.apache.spark.executor.TaskMetrics
  */
 class StageInfo(
     stage: Stage,
-    val taskInfos: mutable.Buffer[(TaskInfo, TaskMetrics)] = mutable.Buffer[(TaskInfo, TaskMetrics)]()
+    val taskInfos: mutable.Buffer[(TaskInfo, TaskMetrics)] =
+    mutable.Buffer[(TaskInfo, TaskMetrics)]()
 ) {
   val stageId = stage.id
   /** When this stage was submitted from the DAGScheduler to a TaskScheduler. */

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/b69f8b2a/core/src/main/scala/org/apache/spark/scheduler/TaskInfo.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskInfo.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskInfo.scala
index 3c22edd..91c27d7 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskInfo.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskInfo.scala
@@ -70,16 +70,17 @@ class TaskInfo(
   def running: Boolean = !finished
 
   def status: String = {
-    if (running)
+    if (running) {
       "RUNNING"
-    else if (gettingResult)
+    } else if (gettingResult) {
       "GET RESULT"
-    else if (failed)
+    } else if (failed) {
       "FAILED"
-    else if (successful)
+    } else if (successful) {
       "SUCCESS"
-    else
+    } else {
       "UNKNOWN"
+    }
   }
 
   def duration: Long = {

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/b69f8b2a/core/src/main/scala/org/apache/spark/scheduler/TaskResult.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskResult.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskResult.scala
index 9d3e615..5724ec9 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskResult.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskResult.scala
@@ -35,7 +35,8 @@ case class IndirectTaskResult[T](blockId: BlockId) extends TaskResult[T] with Se
 
 /** A TaskResult that contains the task's return value and accumulator updates. */
 private[spark]
-class DirectTaskResult[T](var valueBytes: ByteBuffer, var accumUpdates: Map[Long, Any], var metrics: TaskMetrics)
+class DirectTaskResult[T](var valueBytes: ByteBuffer, var accumUpdates: Map[Long, Any],
+    var metrics: TaskMetrics)
   extends TaskResult[T] with Externalizable {
 
   def this() = this(null.asInstanceOf[ByteBuffer], null, null)

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/b69f8b2a/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala
index 17b6d97..1cdfed1 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala
@@ -36,7 +36,8 @@ private[spark] trait TaskScheduler {
   def start(): Unit
 
   // Invoked after system has successfully initialized (typically in spark context).
-  // Yarn uses this to bootstrap allocation of resources based on preferred locations, wait for slave registerations, etc.
+  // Yarn uses this to bootstrap allocation of resources based on preferred locations,
+  // wait for slave registerations, etc.
   def postStartHook() { }
 
   // Disconnect from the cluster.

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/b69f8b2a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
index 3f0ee7a..21b2ff1 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
@@ -80,7 +80,7 @@ private[spark] class TaskSetManager(
   var minShare = 0
   var priority = taskSet.priority
   var stageId = taskSet.stageId
-  var name = "TaskSet_"+taskSet.stageId.toString
+  var name = "TaskSet_" + taskSet.stageId.toString
   var parent: Pool = null
 
   val runningTasksSet = new HashSet[Long]

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/b69f8b2a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
index 0208388..7820410 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
@@ -120,7 +120,8 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A
         sender ! true
 
       case DisassociatedEvent(_, address, _) =>
-        addressToExecutorId.get(address).foreach(removeExecutor(_, "remote Akka client disassociated"))
+        addressToExecutorId.get(address).foreach(removeExecutor(_,
+          "remote Akka client disassociated"))
 
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/b69f8b2a/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
index 33aac52..04f35cc 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
@@ -51,8 +51,8 @@ private[spark] class SparkDeploySchedulerBackend(
     val command = Command(
       "org.apache.spark.executor.CoarseGrainedExecutorBackend", args, sc.executorEnvs)
     val sparkHome = sc.getSparkHome()
-    val appDesc = new ApplicationDescription(appName, maxCores, sc.executorMemory, command, sparkHome,
-        "http://" + sc.ui.appUIAddress)
+    val appDesc = new ApplicationDescription(appName, maxCores, sc.executorMemory, command,
+      sparkHome, "http://" + sc.ui.appUIAddress)
 
     client = new AppClient(sc.env.actorSystem, masters, appDesc, this, conf)
     client.start()
@@ -84,7 +84,8 @@ private[spark] class SparkDeploySchedulerBackend(
     }
   }
 
-  override def executorAdded(fullId: String, workerId: String, hostPort: String, cores: Int, memory: Int) {
+  override def executorAdded(fullId: String, workerId: String, hostPort: String, cores: Int,
+    memory: Int) {
     logInfo("Granted executor ID %s on hostPort %s with %d cores, %s RAM".format(
       fullId, hostPort, cores, Utils.megabytesToString(memory)))
   }

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/b69f8b2a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
index c27049b..4401f6d 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
@@ -136,7 +136,8 @@ private[spark] class CoarseMesosSchedulerBackend(
       // glob the directory "correctly".
       val basename = uri.split('/').last.split('.').head
       command.setValue(
-        "cd %s*; ./bin/spark-class org.apache.spark.executor.CoarseGrainedExecutorBackend %s %s %s %d"
+        ("cd %s*; " +
+          "./bin/spark-class org.apache.spark.executor.CoarseGrainedExecutorBackend %s %s %s %d")
           .format(basename, driverUrl, offer.getSlaveId.getValue, offer.getHostname, numCores))
       command.addUris(CommandInfo.URI.newBuilder().setValue(uri))
     }

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/b69f8b2a/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala b/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala
index c14cd47..2d0b255 100644
--- a/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala
+++ b/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala
@@ -60,7 +60,8 @@ class KryoSerializer(conf: SparkConf) extends org.apache.spark.serializer.Serial
     try {
       for (regCls <- conf.getOption("spark.kryo.registrator")) {
         logDebug("Running user registrator: " + regCls)
-        val reg = Class.forName(regCls, true, classLoader).newInstance().asInstanceOf[KryoRegistrator]
+        val reg = Class.forName(regCls, true, classLoader).newInstance()
+          .asInstanceOf[KryoRegistrator]
         reg.registerClasses(kryo)
       }
     } catch {

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/b69f8b2a/core/src/main/scala/org/apache/spark/serializer/Serializer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/serializer/Serializer.scala b/core/src/main/scala/org/apache/spark/serializer/Serializer.scala
index 9a5e3cb..a38a2b5 100644
--- a/core/src/main/scala/org/apache/spark/serializer/Serializer.scala
+++ b/core/src/main/scala/org/apache/spark/serializer/Serializer.scala
@@ -27,11 +27,12 @@ import org.apache.spark.util.{NextIterator, ByteBufferInputStream}
 
 /**
  * A serializer. Because some serialization libraries are not thread safe, this class is used to
- * create [[org.apache.spark.serializer.SerializerInstance]] objects that do the actual serialization and are
- * guaranteed to only be called from one thread at a time.
+ * create [[org.apache.spark.serializer.SerializerInstance]] objects that do the actual
+ * serialization and are guaranteed to only be called from one thread at a time.
  *
  * Implementations of this trait should have a zero-arg constructor or a constructor that accepts a
- * [[org.apache.spark.SparkConf]] as parameter. If both constructors are defined, the latter takes precedence.
+ * [[org.apache.spark.SparkConf]] as parameter. If both constructors are defined, the latter takes
+ * precedence.
  */
 trait Serializer {
   def newInstance(): SerializerInstance

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/b69f8b2a/core/src/main/scala/org/apache/spark/storage/BlockFetcherIterator.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockFetcherIterator.scala b/core/src/main/scala/org/apache/spark/storage/BlockFetcherIterator.scala
index 4fa2ab9..aa62ab5 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockFetcherIterator.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockFetcherIterator.scala
@@ -76,9 +76,9 @@ object BlockFetcherIterator {
 
     import blockManager._
 
-    private var _remoteBytesRead = 0l
-    private var _remoteFetchTime = 0l
-    private var _fetchWaitTime = 0l
+    private var _remoteBytesRead = 0L
+    private var _remoteFetchTime = 0L
+    private var _fetchWaitTime = 0L
 
     if (blocksByAddress == null) {
       throw new IllegalArgumentException("BlocksByAddress is null")

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/b69f8b2a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
index ed53558..542deb9 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
@@ -206,8 +206,9 @@ private[spark] class BlockManager(
    * message reflecting the current status, *not* the desired storage level in its block info.
    * For example, a block with MEMORY_AND_DISK set might have fallen out to be only on disk.
    *
-   * droppedMemorySize exists to account for when block is dropped from memory to disk (so it is still valid).
-   * This ensures that update in master will compensate for the increase in memory on slave.
+   * droppedMemorySize exists to account for when block is dropped from memory to disk (so it
+   * is still valid). This ensures that update in master will compensate for the increase in
+   * memory on slave.
    */
   def reportBlockStatus(blockId: BlockId, info: BlockInfo, droppedMemorySize: Long = 0L) {
     val needReregister = !tryToReportBlockStatus(blockId, info, droppedMemorySize)
@@ -224,7 +225,8 @@ private[spark] class BlockManager(
    * which will be true if the block was successfully recorded and false if
    * the slave needs to re-register.
    */
-  private def tryToReportBlockStatus(blockId: BlockId, info: BlockInfo, droppedMemorySize: Long = 0L): Boolean = {
+  private def tryToReportBlockStatus(blockId: BlockId, info: BlockInfo,
+      droppedMemorySize: Long = 0L): Boolean = {
     val (curLevel, inMemSize, onDiskSize, tellMaster) = info.synchronized {
       info.level match {
         case null =>
@@ -282,14 +284,15 @@ private[spark] class BlockManager(
     // As an optimization for map output fetches, if the block is for a shuffle, return it
     // without acquiring a lock; the disk store never deletes (recent) items so this should work
     if (blockId.isShuffle) {
-      return diskStore.getBytes(blockId) match {
+      diskStore.getBytes(blockId) match {
         case Some(bytes) =>
           Some(bytes)
         case None =>
           throw new Exception("Block " + blockId + " not found on disk, though it should be")
       }
+    } else {
+        doGetLocal(blockId, asValues = false).asInstanceOf[Option[ByteBuffer]]
     }
-    doGetLocal(blockId, asValues = false).asInstanceOf[Option[ByteBuffer]]
   }
 
   private def doGetLocal(blockId: BlockId, asValues: Boolean): Option[Any] = {
@@ -701,7 +704,8 @@ private[spark] class BlockManager(
               diskStore.putBytes(blockId, bytes, level)
           }
         }
-        val droppedMemorySize = if (memoryStore.contains(blockId)) memoryStore.getSize(blockId) else 0L
+        val droppedMemorySize =
+          if (memoryStore.contains(blockId)) memoryStore.getSize(blockId) else 0L
         val blockWasRemoved = memoryStore.remove(blockId)
         if (!blockWasRemoved) {
           logWarning("Block " + blockId + " could not be dropped from memory as it does not exist")

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/b69f8b2a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala
index 2c1a4e2..893418f 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala
@@ -61,8 +61,8 @@ class BlockManagerMasterActor(val isLocal: Boolean, conf: SparkConf) extends Act
   override def preStart() {
     if (!BlockManager.getDisableHeartBeatsForTesting(conf)) {
       import context.dispatcher
-      timeoutCheckingTask = context.system.scheduler.schedule(
-        0.seconds, checkTimeoutInterval.milliseconds, self, ExpireDeadHosts)
+      timeoutCheckingTask = context.system.scheduler.schedule(0.seconds,
+        checkTimeoutInterval.milliseconds, self, ExpireDeadHosts)
     }
     super.preStart()
   }
@@ -169,8 +169,8 @@ class BlockManagerMasterActor(val isLocal: Boolean, conf: SparkConf) extends Act
     val toRemove = new mutable.HashSet[BlockManagerId]
     for (info <- blockManagerInfo.values) {
       if (info.lastSeenMs < minSeenTime) {
-        logWarning("Removing BlockManager " + info.blockManagerId + " with no recent heart beats: " +
-          (now - info.lastSeenMs) + "ms exceeds " + slaveTimeout + "ms")
+        logWarning("Removing BlockManager " + info.blockManagerId + " with no recent heart beats: "
+          + (now - info.lastSeenMs) + "ms exceeds " + slaveTimeout + "ms")
         toRemove += info.blockManagerId
       }
     }

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/b69f8b2a/core/src/main/scala/org/apache/spark/storage/BlockManagerSource.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerSource.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerSource.scala
index 365866d..7cf754f 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManagerSource.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerSource.scala
@@ -57,9 +57,9 @@ private[spark] class BlockManagerSource(val blockManager: BlockManager, sc: Spar
     override def getValue: Long = {
       val storageStatusList = blockManager.master.getStorageStatus
       val diskSpaceUsed = storageStatusList
-      	.flatMap(_.blocks.values.map(_.diskSize))
-      	.reduceOption(_ + _)
-      	.getOrElse(0L)
+        .flatMap(_.blocks.values.map(_.diskSize))
+        .reduceOption(_ + _)
+        .getOrElse(0L)
 
       diskSpaceUsed / 1024 / 1024
     }

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/b69f8b2a/core/src/main/scala/org/apache/spark/storage/BlockMessageArray.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockMessageArray.scala b/core/src/main/scala/org/apache/spark/storage/BlockMessageArray.scala
index 5932936..5ded9ab 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockMessageArray.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockMessageArray.scala
@@ -25,7 +25,8 @@ import org.apache.spark._
 import org.apache.spark.network._
 
 private[spark]
-class BlockMessageArray(var blockMessages: Seq[BlockMessage]) extends Seq[BlockMessage] with Logging {
+class BlockMessageArray(var blockMessages: Seq[BlockMessage])
+    extends Seq[BlockMessage] with Logging {
   
   def this(bm: BlockMessage) = this(Array(bm))
 
@@ -65,7 +66,8 @@ class BlockMessageArray(var blockMessages: Seq[BlockMessage]) extends Seq[BlockM
       buffer.position(buffer.position() + size)
     }
     val finishTime = System.currentTimeMillis
-    logDebug("Converted block message array from buffer message in " + (finishTime - startTime) / 1000.0  + " s")
+    logDebug("Converted block message array from buffer message in " +
+      (finishTime - startTime) / 1000.0  + " s")
     this.blockMessages = newBlockMessages 
   }
   

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/b69f8b2a/core/src/main/scala/org/apache/spark/storage/StorageUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/StorageUtils.scala b/core/src/main/scala/org/apache/spark/storage/StorageUtils.scala
index 1720007..50a0cdb 100644
--- a/core/src/main/scala/org/apache/spark/storage/StorageUtils.scala
+++ b/core/src/main/scala/org/apache/spark/storage/StorageUtils.scala
@@ -25,15 +25,15 @@ private[spark]
 case class StorageStatus(blockManagerId: BlockManagerId, maxMem: Long,
   blocks: Map[BlockId, BlockStatus]) {
 
-  def memUsed() = blocks.values.map(_.memSize).reduceOption(_+_).getOrElse(0L)
+  def memUsed() = blocks.values.map(_.memSize).reduceOption(_ + _).getOrElse(0L)
 
   def memUsedByRDD(rddId: Int) =
-    rddBlocks.filterKeys(_.rddId == rddId).values.map(_.memSize).reduceOption(_+_).getOrElse(0L)
+    rddBlocks.filterKeys(_.rddId == rddId).values.map(_.memSize).reduceOption(_ + _).getOrElse(0L)
 
-  def diskUsed() = blocks.values.map(_.diskSize).reduceOption(_+_).getOrElse(0L)
+  def diskUsed() = blocks.values.map(_.diskSize).reduceOption(_ + _).getOrElse(0L)
 
   def diskUsedByRDD(rddId: Int) =
-    rddBlocks.filterKeys(_.rddId == rddId).values.map(_.diskSize).reduceOption(_+_).getOrElse(0L)
+    rddBlocks.filterKeys(_.rddId == rddId).values.map(_.diskSize).reduceOption(_ + _).getOrElse(0L)
 
   def memRemaining : Long = maxMem - memUsed()
 
@@ -48,8 +48,9 @@ case class RDDInfo(id: Int, name: String, storageLevel: StorageLevel,
   extends Ordered[RDDInfo] {
   override def toString = {
     import Utils.bytesToString
-    "RDD \"%s\" (%d) Storage: %s; CachedPartitions: %d; TotalPartitions: %d; MemorySize: %s; DiskSize: %s".format(name, id,
-      storageLevel.toString, numCachedPartitions, numPartitions, bytesToString(memSize), bytesToString(diskSize))
+    ("RDD \"%s\" (%d) Storage: %s; CachedPartitions: %d; TotalPartitions: %d; MemorySize: %s; " +
+       "DiskSize: %s").format(name, id, storageLevel.toString, numCachedPartitions,
+         numPartitions, bytesToString(memSize), bytesToString(diskSize))
   }
 
   override def compare(that: RDDInfo) = {
@@ -64,7 +65,8 @@ object StorageUtils {
   /* Returns RDD-level information, compiled from a list of StorageStatus objects */
   def rddInfoFromStorageStatus(storageStatusList: Seq[StorageStatus],
     sc: SparkContext) : Array[RDDInfo] = {
-    rddInfoFromBlockStatusList(storageStatusList.flatMap(_.rddBlocks).toMap[RDDBlockId, BlockStatus], sc)
+    rddInfoFromBlockStatusList(
+      storageStatusList.flatMap(_.rddBlocks).toMap[RDDBlockId, BlockStatus], sc)
   }
 
   /* Returns a map of blocks to their locations, compiled from a list of StorageStatus objects */
@@ -91,7 +93,8 @@ object StorageUtils {
       sc.persistentRdds.get(rddId).map { r =>
         val rddName = Option(r.name).getOrElse(rddId.toString)
         val rddStorageLevel = r.getStorageLevel
-        RDDInfo(rddId, rddName, rddStorageLevel, rddBlocks.length, r.partitions.size, memSize, diskSize)
+        RDDInfo(rddId, rddName, rddStorageLevel, rddBlocks.length, r.partitions.size,
+          memSize, diskSize)
       }
     }.flatten.toArray
 

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/b69f8b2a/core/src/main/scala/org/apache/spark/ui/UIUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/UIUtils.scala b/core/src/main/scala/org/apache/spark/ui/UIUtils.scala
index 5573b38..b95c8f4 100644
--- a/core/src/main/scala/org/apache/spark/ui/UIUtils.scala
+++ b/core/src/main/scala/org/apache/spark/ui/UIUtils.scala
@@ -48,14 +48,16 @@ private[spark] object UIUtils {
       case _ => <li><a href={prependBaseUri("/environment")}>Environment</a></li>
     }
     val executors = page match {
-      case Executors => <li class="active"><a href={prependBaseUri("/executors")}>Executors</a></li>
+      case Executors => <li class="active"><a href={prependBaseUri("/executors")}>Executors</a>
+      </li>
       case _ => <li><a href={prependBaseUri("/executors")}>Executors</a></li>
     }
 
     <html>
       <head>
         <meta http-equiv="Content-type" content="text/html; charset=utf-8" />
-        <link rel="stylesheet" href={prependBaseUri("/static/bootstrap.min.css")} type="text/css" />
+        <link rel="stylesheet" href={prependBaseUri("/static/bootstrap.min.css")}
+              type="text/css" />
         <link rel="stylesheet" href={prependBaseUri("/static/webui.css")}  type="text/css" />
         <script src={prependBaseUri("/static/sorttable.js")} ></script>
         <title>{sc.appName} - {title}</title>
@@ -63,7 +65,8 @@ private[spark] object UIUtils {
       <body>
         <div class="navbar navbar-static-top">
           <div class="navbar-inner">
-            <a href={prependBaseUri("/")} class="brand"><img src={prependBaseUri("/static/spark-logo-77x50px-hd.png")}  /></a>
+            <a href={prependBaseUri("/")} class="brand">
+                <img src={prependBaseUri("/static/spark-logo-77x50px-hd.png")}  /></a>
             <ul class="nav">
               {jobs}
               {storage}
@@ -93,7 +96,8 @@ private[spark] object UIUtils {
     <html>
       <head>
         <meta http-equiv="Content-type" content="text/html; charset=utf-8" />
-        <link rel="stylesheet" href={prependBaseUri("/static/bootstrap.min.css")} type="text/css" />
+        <link rel="stylesheet" href={prependBaseUri("/static/bootstrap.min.css")}
+              type="text/css" />
         <link rel="stylesheet" href={prependBaseUri("/static/webui.css")}  type="text/css" />
         <script src={prependBaseUri("/static/sorttable.js")} ></script>
         <title>{title}</title>
@@ -103,7 +107,8 @@ private[spark] object UIUtils {
           <div class="row-fluid">
             <div class="span12">
               <h3 style="vertical-align: middle; display: inline-block;">
-                <img src={prependBaseUri("/static/spark-logo-77x50px-hd.png")} style="margin-right: 15px;" />
+                <img src={prependBaseUri("/static/spark-logo-77x50px-hd.png")}
+                     style="margin-right: 15px;" />
                 {title}
               </h3>
             </div>

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/b69f8b2a/core/src/main/scala/org/apache/spark/ui/UIWorkloadGenerator.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/UIWorkloadGenerator.scala b/core/src/main/scala/org/apache/spark/ui/UIWorkloadGenerator.scala
index 6ba1518..f913ee4 100644
--- a/core/src/main/scala/org/apache/spark/ui/UIWorkloadGenerator.scala
+++ b/core/src/main/scala/org/apache/spark/ui/UIWorkloadGenerator.scala
@@ -36,7 +36,8 @@ private[spark] object UIWorkloadGenerator {
 
   def main(args: Array[String]) {
     if (args.length < 2) {
-      println("usage: ./bin/spark-class org.apache.spark.ui.UIWorkloadGenerator [master] [FIFO|FAIR]")
+      println(
+        "usage: ./bin/spark-class org.apache.spark.ui.UIWorkloadGenerator [master] [FIFO|FAIR]")
       System.exit(1)
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/b69f8b2a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsUI.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsUI.scala b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsUI.scala
index a31a7e1..4e41acf 100644
--- a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsUI.scala
+++ b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsUI.scala
@@ -51,9 +51,9 @@ private[spark] class ExecutorsUI(val sc: SparkContext) {
   def render(request: HttpServletRequest): Seq[Node] = {
     val storageStatusList = sc.getExecutorStorageStatus
 
-    val maxMem = storageStatusList.map(_.maxMem).fold(0L)(_+_)
-    val memUsed = storageStatusList.map(_.memUsed()).fold(0L)(_+_)
-    val diskSpaceUsed = storageStatusList.flatMap(_.blocks.values.map(_.diskSize)).fold(0L)(_+_)
+    val maxMem = storageStatusList.map(_.maxMem).fold(0L)(_ + _)
+    val memUsed = storageStatusList.map(_.memUsed()).fold(0L)(_ + _)
+    val diskSpaceUsed = storageStatusList.flatMap(_.blocks.values.map(_.diskSize)).fold(0L)(_ + _)
 
     val execHead = Seq("Executor ID", "Address", "RDD blocks", "Memory used", "Disk used",
       "Active tasks", "Failed tasks", "Complete tasks", "Total tasks", "Task Time", "Shuffle Read",

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/b69f8b2a/core/src/main/scala/org/apache/spark/ui/jobs/IndexPage.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/IndexPage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/IndexPage.scala
index ca5a286..6289f87 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/IndexPage.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/IndexPage.scala
@@ -43,7 +43,8 @@ private[spark] class IndexPage(parent: JobProgressUI) {
       }
 
       val activeStagesTable = new StageTable(activeStages.sortBy(_.submissionTime).reverse, parent)
-      val completedStagesTable = new StageTable(completedStages.sortBy(_.submissionTime).reverse, parent)
+      val completedStagesTable = new StageTable(completedStages.sortBy(_.submissionTime).reverse,
+        parent)
       val failedStagesTable = new StageTable(failedStages.sortBy(_.submissionTime).reverse, parent)
 
       val pools = listener.sc.getAllPools

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/b69f8b2a/core/src/main/scala/org/apache/spark/ui/jobs/PoolTable.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/PoolTable.scala b/core/src/main/scala/org/apache/spark/ui/jobs/PoolTable.scala
index cfeeccd..9412a48 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/PoolTable.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/PoolTable.scala
@@ -60,7 +60,10 @@ private[spark] class PoolTable(pools: Seq[Schedulable], listener: JobProgressLis
       case None => 0
     }
     <tr>
-      <td><a href={"%s/stages/pool?poolname=%s".format(UIUtils.prependBaseUri(),p.name)}>{p.name}</a></td>
+      <td>
+          <a href=
+             {"%s/stages/pool?poolname=%s".format(UIUtils.prependBaseUri(),p.name)}>
+              {p.name}</a></td>
       <td>{p.minShare}</td>
       <td>{p.weight}</td>
       <td>{activeStages}</td>

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/b69f8b2a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
index cfaf121..08107a3 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
@@ -64,7 +64,7 @@ private[spark] class StagePage(parent: JobProgressUI) {
       listener.stageIdToTasksActive(stageId).foreach(activeTime += _.timeRunning(now))
 
       val finishedTasks = listener.stageIdToTaskInfos(stageId).filter(_._1.finished)
-
+      // scalastyle:off
       val summary =
         <div>
           <ul class="unstyled">
@@ -96,7 +96,7 @@ private[spark] class StagePage(parent: JobProgressUI) {
             }
           </ul>
         </div>
-
+        // scalastyle:on
       val taskHeaders: Seq[String] =
         Seq("Task Index", "Task ID", "Status", "Locality Level", "Executor", "Launch Time") ++
         Seq("Duration", "GC Time", "Result Ser Time") ++
@@ -105,7 +105,8 @@ private[spark] class StagePage(parent: JobProgressUI) {
         {if (hasBytesSpilled) Seq("Shuffle Spill (Memory)", "Shuffle Spill (Disk)") else Nil} ++
         Seq("Errors")
 
-      val taskTable = listingTable(taskHeaders, taskRow(hasShuffleRead, hasShuffleWrite, hasBytesSpilled), tasks)
+      val taskTable = listingTable(
+        taskHeaders, taskRow(hasShuffleRead, hasShuffleWrite, hasBytesSpilled), tasks)
 
       // Excludes tasks which failed and have incomplete metrics
       val validTasks = tasks.filter(t => t._1.status == "SUCCESS" && (t._2.isDefined))
@@ -117,8 +118,9 @@ private[spark] class StagePage(parent: JobProgressUI) {
         else {
           val serializationTimes = validTasks.map{case (info, metrics, exception) =>
             metrics.get.resultSerializationTime.toDouble}
-          val serializationQuantiles = "Result serialization time" +: Distribution(serializationTimes).get.getQuantiles().map(
-            ms => parent.formatDuration(ms.toLong))
+          val serializationQuantiles =
+            "Result serialization time" +: Distribution(serializationTimes).
+              get.getQuantiles().map(ms => parent.formatDuration(ms.toLong))
 
           val serviceTimes = validTasks.map{case (info, metrics, exception) =>
             metrics.get.executorRunTime.toDouble}
@@ -225,7 +227,8 @@ private[spark] class StagePage(parent: JobProgressUI) {
     val shuffleReadSortable = maybeShuffleRead.map(_.toString).getOrElse("")
     val shuffleReadReadable = maybeShuffleRead.map{Utils.bytesToString(_)}.getOrElse("")
 
-    val maybeShuffleWrite = metrics.flatMap{m => m.shuffleWriteMetrics}.map{s => s.shuffleBytesWritten}
+    val maybeShuffleWrite =
+      metrics.flatMap{m => m.shuffleWriteMetrics}.map{s => s.shuffleBytesWritten}
     val shuffleWriteSortable = maybeShuffleWrite.map(_.toString).getOrElse("")
     val shuffleWriteReadable = maybeShuffleWrite.map{Utils.bytesToString(_)}.getOrElse("")
 
@@ -236,7 +239,8 @@ private[spark] class StagePage(parent: JobProgressUI) {
 
     val maybeMemoryBytesSpilled = metrics.map{m => m.memoryBytesSpilled}
     val memoryBytesSpilledSortable = maybeMemoryBytesSpilled.map(_.toString).getOrElse("")
-    val memoryBytesSpilledReadable = maybeMemoryBytesSpilled.map{Utils.bytesToString(_)}.getOrElse("")
+    val memoryBytesSpilledReadable = maybeMemoryBytesSpilled.map{Utils.bytesToString(_)}
+      .getOrElse("")
 
     val maybeDiskBytesSpilled = metrics.map{m => m.diskBytesSpilled}
     val diskBytesSpilledSortable = maybeDiskBytesSpilled.map(_.toString).getOrElse("")

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/b69f8b2a/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala b/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala
index 9ad6de3..01b6479 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala
@@ -59,7 +59,8 @@ private[spark] class StageTable(val stages: Seq[StageInfo], val parent: JobProgr
     </table>
   }
 
-  private def makeProgressBar(started: Int, completed: Int, failed: String, total: Int): Seq[Node] = {
+  private def makeProgressBar(started: Int, completed: Int, failed: String, total: Int)
+  : Seq[Node] = {
     val completeWidth = "width: %s%%".format((completed.toDouble/total)*100)
     val startWidth = "width: %s%%".format((started.toDouble/total)*100)
 

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/b69f8b2a/core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala b/core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala
index 1df6b87..3eb0f08 100644
--- a/core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala
+++ b/core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala
@@ -24,7 +24,7 @@ import scala.collection.mutable.Set
 
 import org.objectweb.asm.{ClassReader, ClassVisitor, MethodVisitor, Type}
 import org.objectweb.asm.Opcodes._
-import java.io.{InputStream, IOException, ByteArrayOutputStream, ByteArrayInputStream, BufferedInputStream}
+import java.io.{ByteArrayOutputStream, ByteArrayInputStream}
 import org.apache.spark.Logging
 
 private[spark] object ClosureCleaner extends Logging {
@@ -159,8 +159,9 @@ private[spark] object ClosureCleaner extends Logging {
       // other than to set its fields, so use its constructor
       val cons = cls.getConstructors()(0)
       val params = cons.getParameterTypes.map(createNullValue).toArray
-      if (outer != null)
+      if (outer != null) {
         params(0) = outer // First param is always outer object
+      }
       return cons.newInstance(params: _*).asInstanceOf[AnyRef]
     } else {
       // Use reflection to instantiate object without calling constructor
@@ -179,7 +180,8 @@ private[spark] object ClosureCleaner extends Logging {
   }
 }
 
-private[spark] class FieldAccessFinder(output: Map[Class[_], Set[String]]) extends ClassVisitor(ASM4) {
+private[spark] class FieldAccessFinder(output: Map[Class[_], Set[String]])
+    extends ClassVisitor(ASM4) {
   override def visitMethod(access: Int, name: String, desc: String,
       sig: String, exceptions: Array[String]): MethodVisitor = {
     new MethodVisitor(ASM4) {
@@ -221,11 +223,12 @@ private[spark] class InnerClosureFinder(output: Set[Class[_]]) extends ClassVisi
         val argTypes = Type.getArgumentTypes(desc)
         if (op == INVOKESPECIAL && name == "<init>" && argTypes.length > 0
             && argTypes(0).toString.startsWith("L") // is it an object?
-            && argTypes(0).getInternalName == myName)
+            && argTypes(0).getInternalName == myName) {
           output += Class.forName(
               owner.replace('/', '.'),
               false,
               Thread.currentThread.getContextClassLoader)
+        }
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/b69f8b2a/core/src/main/scala/org/apache/spark/util/CompletionIterator.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/CompletionIterator.scala b/core/src/main/scala/org/apache/spark/util/CompletionIterator.scala
index fcc1ca9..b6a0998 100644
--- a/core/src/main/scala/org/apache/spark/util/CompletionIterator.scala
+++ b/core/src/main/scala/org/apache/spark/util/CompletionIterator.scala
@@ -21,7 +21,10 @@ package org.apache.spark.util
  * Wrapper around an iterator which calls a completion method after it successfully iterates
  * through all the elements.
  */
-private[spark] abstract class CompletionIterator[+A, +I <: Iterator[A]](sub: I) extends Iterator[A]{
+private[spark]
+// scalastyle:off
+abstract class CompletionIterator[ +A, +I <: Iterator[A]](sub: I) extends Iterator[A] {
+// scalastyle:on
   def next() = sub.next()
   def hasNext = {
     val r = sub.hasNext

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/b69f8b2a/core/src/main/scala/org/apache/spark/util/Distribution.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/Distribution.scala b/core/src/main/scala/org/apache/spark/util/Distribution.scala
index 33bf356..ab738c4 100644
--- a/core/src/main/scala/org/apache/spark/util/Distribution.scala
+++ b/core/src/main/scala/org/apache/spark/util/Distribution.scala
@@ -20,7 +20,8 @@ package org.apache.spark.util
 import java.io.PrintStream
 
 /**
- * Util for getting some stats from a small sample of numeric values, with some handy summary functions.
+ * Util for getting some stats from a small sample of numeric values, with some handy
+ * summary functions.
  *
  * Entirely in memory, not intended as a good way to compute stats over large data sets.
  *
@@ -68,10 +69,11 @@ class Distribution(val data: Array[Double], val startIdx: Int, val endIdx: Int)
 object Distribution {
 
   def apply(data: Traversable[Double]): Option[Distribution] = {
-    if (data.size > 0)
+    if (data.size > 0) {
       Some(new Distribution(data))
-    else
+    } else {
       None
+    }
   }
 
   def showQuantiles(out: PrintStream = System.out, quantiles: Traversable[Double]) {

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/b69f8b2a/core/src/main/scala/org/apache/spark/util/MetadataCleaner.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/MetadataCleaner.scala b/core/src/main/scala/org/apache/spark/util/MetadataCleaner.scala
index b0febe9..3868ab3 100644
--- a/core/src/main/scala/org/apache/spark/util/MetadataCleaner.scala
+++ b/core/src/main/scala/org/apache/spark/util/MetadataCleaner.scala
@@ -67,7 +67,8 @@ private[spark] object MetadataCleanerType extends Enumeration {
 
   type MetadataCleanerType = Value
 
-  def systemProperty(which: MetadataCleanerType.MetadataCleanerType) = "spark.cleaner.ttl." + which.toString
+  def systemProperty(which: MetadataCleanerType.MetadataCleanerType) =
+      "spark.cleaner.ttl." + which.toString
 }
 
 // TODO: This mutates a Conf to set properties right now, which is kind of ugly when used in the

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/b69f8b2a/core/src/main/scala/org/apache/spark/util/SerializableHyperLogLog.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/SerializableHyperLogLog.scala b/core/src/main/scala/org/apache/spark/util/SerializableHyperLogLog.scala
index 8b4e7c1..2110b35 100644
--- a/core/src/main/scala/org/apache/spark/util/SerializableHyperLogLog.scala
+++ b/core/src/main/scala/org/apache/spark/util/SerializableHyperLogLog.scala
@@ -21,7 +21,8 @@ import java.io.{Externalizable, ObjectOutput, ObjectInput}
 import com.clearspring.analytics.stream.cardinality.{ICardinality, HyperLogLog}
 
 /**
- * A wrapper around [[com.clearspring.analytics.stream.cardinality.HyperLogLog]] that is serializable.
+ * A wrapper around [[com.clearspring.analytics.stream.cardinality.HyperLogLog]] that is
+ * serializable.
  */
 private[spark]
 class SerializableHyperLogLog(var value: ICardinality) extends Externalizable {

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/b69f8b2a/core/src/main/scala/org/apache/spark/util/SizeEstimator.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/SizeEstimator.scala b/core/src/main/scala/org/apache/spark/util/SizeEstimator.scala
index 3cf9489..5f86795 100644
--- a/core/src/main/scala/org/apache/spark/util/SizeEstimator.scala
+++ b/core/src/main/scala/org/apache/spark/util/SizeEstimator.scala
@@ -224,24 +224,26 @@ private[spark] object SizeEstimator extends Logging {
   }
 
   private def primitiveSize(cls: Class[_]): Long = {
-    if (cls == classOf[Byte])
+    if (cls == classOf[Byte]) {
       BYTE_SIZE
-    else if (cls == classOf[Boolean])
+    } else if (cls == classOf[Boolean]) {
       BOOLEAN_SIZE
-    else if (cls == classOf[Char])
+    } else if (cls == classOf[Char]) {
       CHAR_SIZE
-    else if (cls == classOf[Short])
+    } else if (cls == classOf[Short]) {
       SHORT_SIZE
-    else if (cls == classOf[Int])
+    } else if (cls == classOf[Int]) {
       INT_SIZE
-    else if (cls == classOf[Long])
+    } else if (cls == classOf[Long]) {
       LONG_SIZE
-    else if (cls == classOf[Float])
+    } else if (cls == classOf[Float]) {
       FLOAT_SIZE
-    else if (cls == classOf[Double])
+    } else if (cls == classOf[Double]) {
       DOUBLE_SIZE
-    else throw new IllegalArgumentException(
+    } else {
+        throw new IllegalArgumentException(
       "Non-primitive class " + cls + " passed to primitiveSize()")
+    }
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/b69f8b2a/core/src/main/scala/org/apache/spark/util/StatCounter.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/StatCounter.scala b/core/src/main/scala/org/apache/spark/util/StatCounter.scala
index 020d5ed..5b0d2c3 100644
--- a/core/src/main/scala/org/apache/spark/util/StatCounter.scala
+++ b/core/src/main/scala/org/apache/spark/util/StatCounter.scala
@@ -20,7 +20,8 @@ package org.apache.spark.util
 /**
  * A class for tracking the statistics of a set of numbers (count, mean and variance) in a
  * numerically robust way. Includes support for merging two StatCounters. Based on 
- * [[http://en.wikipedia.org/wiki/Algorithms_for_calculating_variance Welford and Chan's algorithms for running variance]].
+ * [[http://en.wikipedia.org/wiki/Algorithms_for_calculating_variance
+ * Welford and Chan's algorithms for running variance]].
  *
  * @constructor Initialize the StatCounter with the given values.
  */
@@ -70,7 +71,7 @@ class StatCounter(values: TraversableOnce[Double]) extends Serializable {
         m2 += other.m2 + (delta * delta * n * other.n) / (n + other.n)
         n += other.n
       }
-      this	   
+      this
     }
   }
 
@@ -91,10 +92,11 @@ class StatCounter(values: TraversableOnce[Double]) extends Serializable {
 
   /** Return the variance of the values. */
   def variance: Double = {
-    if (n == 0)
+    if (n == 0) {
       Double.NaN
-    else
+    } else {
       m2 / n
+    }
   }
 
   /**
@@ -102,10 +104,11 @@ class StatCounter(values: TraversableOnce[Double]) extends Serializable {
    * by N-1 instead of N.
    */
   def sampleVariance: Double = {
-    if (n <= 1)
+    if (n <= 1) {
       Double.NaN
-    else
+    } else {
       m2 / (n - 1)
+    }
   }
 
   /** Return the standard deviation of the values. */

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/b69f8b2a/core/src/main/scala/org/apache/spark/util/Utils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala
index 861ad62..c201d0a 100644
--- a/core/src/main/scala/org/apache/spark/util/Utils.scala
+++ b/core/src/main/scala/org/apache/spark/util/Utils.scala
@@ -86,7 +86,8 @@ private[spark] object Utils extends Logging {
   }
 
   /** Serialize via nested stream using specific serializer */
-  def serializeViaNestedStream(os: OutputStream, ser: SerializerInstance)(f: SerializationStream => Unit) = {
+  def serializeViaNestedStream(os: OutputStream, ser: SerializerInstance)(
+      f: SerializationStream => Unit) = {
     val osWrapper = ser.serializeStream(new OutputStream {
       def write(b: Int) = os.write(b)
 
@@ -100,7 +101,8 @@ private[spark] object Utils extends Logging {
   }
 
   /** Deserialize via nested stream using specific serializer */
-  def deserializeViaNestedStream(is: InputStream, ser: SerializerInstance)(f: DeserializationStream => Unit) = {
+  def deserializeViaNestedStream(is: InputStream, ser: SerializerInstance)(
+      f: DeserializationStream => Unit) = {
     val isWrapper = ser.deserializeStream(new InputStream {
       def read(): Int = is.read()
 

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/b69f8b2a/core/src/main/scala/org/apache/spark/util/Vector.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/Vector.scala b/core/src/main/scala/org/apache/spark/util/Vector.scala
index 83fa0bf..96da93d 100644
--- a/core/src/main/scala/org/apache/spark/util/Vector.scala
+++ b/core/src/main/scala/org/apache/spark/util/Vector.scala
@@ -26,24 +26,27 @@ class Vector(val elements: Array[Double]) extends Serializable {
   def apply(index: Int) = elements(index)
 
   def + (other: Vector): Vector = {
-    if (length != other.length)
+    if (length != other.length) {
       throw new IllegalArgumentException("Vectors of different length")
+    }
     Vector(length, i => this(i) + other(i))
   }
 
   def add(other: Vector) = this + other
 
   def - (other: Vector): Vector = {
-    if (length != other.length)
+    if (length != other.length) {
       throw new IllegalArgumentException("Vectors of different length")
+    }
     Vector(length, i => this(i) - other(i))
   }
 
   def subtract(other: Vector) = this - other
 
   def dot(other: Vector): Double = {
-    if (length != other.length)
+    if (length != other.length) {
       throw new IllegalArgumentException("Vectors of different length")
+    }
     var ans = 0.0
     var i = 0
     while (i < length) {
@@ -60,10 +63,12 @@ class Vector(val elements: Array[Double]) extends Serializable {
    * @return
    */
   def plusDot(plus: Vector, other: Vector): Double = {
-    if (length != other.length)
+    if (length != other.length) {
       throw new IllegalArgumentException("Vectors of different length")
-    if (length != plus.length)
+    }
+    if (length != plus.length) {
       throw new IllegalArgumentException("Vectors of different length")
+    }
     var ans = 0.0
     var i = 0
     while (i < length) {
@@ -74,8 +79,9 @@ class Vector(val elements: Array[Double]) extends Serializable {
   }
 
   def += (other: Vector): Vector = {
-    if (length != other.length)
+    if (length != other.length) {
       throw new IllegalArgumentException("Vectors of different length")
+    }
     var i = 0
     while (i < length) {
       elements(i) += other(i)
@@ -131,7 +137,8 @@ object Vector {
    * Creates this [[org.apache.spark.util.Vector]] of given length containing random numbers 
    * between 0.0 and 1.0. Optional [[scala.util.Random]] number generator can be provided.
    */
-  def random(length: Int, random: Random = new XORShiftRandom()) = Vector(length, _ => random.nextDouble())
+  def random(length: Int, random: Random = new XORShiftRandom()) =
+    Vector(length, _ => random.nextDouble())
 
   class Multiplier(num: Double) {
     def * (vec: Vector) = vec * num

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/b69f8b2a/core/src/main/scala/org/apache/spark/util/collection/BitSet.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/collection/BitSet.scala b/core/src/main/scala/org/apache/spark/util/collection/BitSet.scala
index 856eb77..c9cf512 100644
--- a/core/src/main/scala/org/apache/spark/util/collection/BitSet.scala
+++ b/core/src/main/scala/org/apache/spark/util/collection/BitSet.scala
@@ -123,7 +123,7 @@ class BitSet(numBits: Int) extends Serializable {
     override def hasNext: Boolean = ind >= 0
     override def next() = {
       val tmp = ind
-      ind  = nextSetBit(ind+1)
+      ind  = nextSetBit(ind + 1)
       tmp
     }
   }

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/b69f8b2a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
index 7eb300d..59ba1e4 100644
--- a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
+++ b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
@@ -280,7 +280,7 @@ private[spark] class ExternalAppendOnlyMap[K, V, C](
 
     /**
      * Select a key with the minimum hash, then combine all values with the same key from all
-     * input streams
+     * input streams.
      */
     override def next(): (K, C) = {
       // Select a key from the StreamBuffer that holds the lowest key hash

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/b69f8b2a/core/src/main/scala/org/apache/spark/util/collection/OpenHashSet.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/collection/OpenHashSet.scala b/core/src/main/scala/org/apache/spark/util/collection/OpenHashSet.scala
index 5ded5d0..148c12e 100644
--- a/core/src/main/scala/org/apache/spark/util/collection/OpenHashSet.scala
+++ b/core/src/main/scala/org/apache/spark/util/collection/OpenHashSet.scala
@@ -187,7 +187,7 @@ class OpenHashSet[@specialized(Long, Int) T: ClassTag](
     override def hasNext: Boolean = pos != INVALID_POS
     override def next(): T = {
       val tmp = getValue(pos)
-      pos = nextPos(pos+1)
+      pos = nextPos(pos + 1)
       tmp
     }
   }

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/b69f8b2a/examples/src/main/scala/org/apache/spark/streaming/examples/StatefulNetworkWordCount.scala
----------------------------------------------------------------------
diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/StatefulNetworkWordCount.scala b/examples/src/main/scala/org/apache/spark/streaming/examples/StatefulNetworkWordCount.scala
index 88f1cef..c2d84a8 100644
--- a/examples/src/main/scala/org/apache/spark/streaming/examples/StatefulNetworkWordCount.scala
+++ b/examples/src/main/scala/org/apache/spark/streaming/examples/StatefulNetworkWordCount.scala
@@ -19,18 +19,21 @@ package org.apache.spark.streaming.examples
 
 import org.apache.spark.streaming._
 import org.apache.spark.streaming.StreamingContext._
-
+// scalastyle:off
 /**
- * Counts words cumulatively in UTF8 encoded, '\n' delimited text received from the network every second.
+ * Counts words cumulatively in UTF8 encoded, '\n' delimited text received from the network every
+ * second.
  * Usage: StatefulNetworkWordCount <master> <hostname> <port>
  *   <master> is the Spark master URL. In local mode, <master> should be 'local[n]' with n > 1.
- *   <hostname> and <port> describe the TCP server that Spark Streaming would connect to receive data.
+ *   <hostname> and <port> describe the TCP server that Spark Streaming would connect to receive
+ *   data.
  *
  * To run this on your local machine, you need to first run a Netcat server
  *    `$ nc -lk 9999`
  * and then run the example
  *    `$ ./bin/run-example org.apache.spark.streaming.examples.StatefulNetworkWordCount local[2] localhost 9999`
  */
+// scalastyle:on
 object StatefulNetworkWordCount {
   def main(args: Array[String]) {
     if (args.length < 3) {
@@ -50,8 +53,8 @@ object StatefulNetworkWordCount {
     }
 
     // Create the context with a 1 second batch size
-    val ssc = new StreamingContext(args(0), "NetworkWordCumulativeCountUpdateStateByKey", Seconds(1),
-      System.getenv("SPARK_HOME"), StreamingContext.jarOfClass(this.getClass))
+    val ssc = new StreamingContext(args(0), "NetworkWordCumulativeCountUpdateStateByKey",
+      Seconds(1), System.getenv("SPARK_HOME"), StreamingContext.jarOfClass(this.getClass))
     ssc.checkpoint(".")
 
     // Create a NetworkInputDStream on target ip:port and count the

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/b69f8b2a/examples/src/main/scala/org/apache/spark/streaming/examples/TwitterAlgebirdHLL.scala
----------------------------------------------------------------------
diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/TwitterAlgebirdHLL.scala b/examples/src/main/scala/org/apache/spark/streaming/examples/TwitterAlgebirdHLL.scala
index a0094d4..c6215fd 100644
--- a/examples/src/main/scala/org/apache/spark/streaming/examples/TwitterAlgebirdHLL.scala
+++ b/examples/src/main/scala/org/apache/spark/streaming/examples/TwitterAlgebirdHLL.scala
@@ -23,20 +23,24 @@ import com.twitter.algebird.HyperLogLog._
 import org.apache.spark.storage.StorageLevel
 import org.apache.spark.streaming.{Seconds, StreamingContext}
 import org.apache.spark.streaming.twitter._
-
+// scalastyle:off
 /**
  * Illustrates the use of the HyperLogLog algorithm, from Twitter's Algebird library, to compute
  * a windowed and global estimate of the unique user IDs occurring in a Twitter stream.
  * <p>
  * <p>
- *   This <a href="http://highlyscalable.wordpress.com/2012/05/01/probabilistic-structures-web-analytics-data-mining/">
+ *   This <a href= "http://highlyscalable.wordpress.com/2012/05/01/probabilistic-structures-web-analytics-data
+ * -mining/">
  *   blog post</a> and this
- *   <a href="http://highscalability.com/blog/2012/4/5/big-data-counting-how-to-count-a-billion-distinct-objects-us.html">blog post</a>
- *   have good overviews of HyperLogLog (HLL). HLL is a memory-efficient datastructure for estimating
- *   the cardinality of a data stream, i.e. the number of unique elements.
+ *   <a href= "http://highscalability.com/blog/2012/4/5/big-data-counting-how-to-count-a-billion-distinct-objects-us.html">
+ *     blog post</a>
+ *   have good overviews of HyperLogLog (HLL). HLL is a memory-efficient datastructure for
+ *   estimating the cardinality of a data stream, i.e. the number of unique elements.
  * <p><p>
- *   Algebird's implementation is a monoid, so we can succinctly merge two HLL instances in the reduce operation.
+ *   Algebird's implementation is a monoid, so we can succinctly merge two HLL instances in the
+ *   reduce operation.
  */
+// scalastyle:on
 object TwitterAlgebirdHLL {
   def main(args: Array[String]) {
     if (args.length < 1) {
@@ -82,7 +86,8 @@ object TwitterAlgebirdHLL {
         userSet ++= partial
         println("Exact distinct users this batch: %d".format(partial.size))
         println("Exact distinct users overall: %d".format(userSet.size))
-        println("Error rate: %2.5f%%".format(((globalHll.estimatedSize / userSet.size.toDouble) - 1) * 100))
+        println("Error rate: %2.5f%%".format(((globalHll.estimatedSize / userSet.size.toDouble) - 1
+          ) * 100))
       }
     })
 

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/b69f8b2a/examples/src/main/scala/org/apache/spark/streaming/examples/clickstream/PageViewGenerator.scala
----------------------------------------------------------------------
diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/clickstream/PageViewGenerator.scala b/examples/src/main/scala/org/apache/spark/streaming/examples/clickstream/PageViewGenerator.scala
index a260098..0ac46c3 100644
--- a/examples/src/main/scala/org/apache/spark/streaming/examples/clickstream/PageViewGenerator.scala
+++ b/examples/src/main/scala/org/apache/spark/streaming/examples/clickstream/PageViewGenerator.scala
@@ -36,6 +36,7 @@ object PageView extends Serializable {
   }
 }
 
+// scalastyle:off
 /** Generates streaming events to simulate page views on a website.
   *
   * This should be used in tandem with PageViewStream.scala. Example:
@@ -44,7 +45,8 @@ object PageView extends Serializable {
   *
   * When running this, you may want to set the root logging level to ERROR in
   * conf/log4j.properties to reduce the verbosity of the output.
-  * */
+  */
+// scalastyle:on
 object PageViewGenerator {
   val pages = Map("http://foo.com/"        -> .7,
                   "http://foo.com/news"    -> 0.2,

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/b69f8b2a/examples/src/main/scala/org/apache/spark/streaming/examples/clickstream/PageViewStream.scala
----------------------------------------------------------------------
diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/clickstream/PageViewStream.scala b/examples/src/main/scala/org/apache/spark/streaming/examples/clickstream/PageViewStream.scala
index bb44bc3..2b130fb 100644
--- a/examples/src/main/scala/org/apache/spark/streaming/examples/clickstream/PageViewStream.scala
+++ b/examples/src/main/scala/org/apache/spark/streaming/examples/clickstream/PageViewStream.scala
@@ -21,7 +21,7 @@ import org.apache.spark.SparkContext._
 import org.apache.spark.streaming.{Seconds, StreamingContext}
 import org.apache.spark.streaming.StreamingContext._
 import org.apache.spark.streaming.examples.StreamingExamples
-
+// scalastyle:off
 /** Analyses a streaming dataset of web page views. This class demonstrates several types of
   * operators available in Spark streaming.
   *
@@ -29,6 +29,7 @@ import org.apache.spark.streaming.examples.StreamingExamples
   * $ ./bin/run-example org.apache.spark.streaming.examples.clickstream.PageViewGenerator 44444 10
   * $ ./bin/run-example org.apache.spark.streaming.examples.clickstream.PageViewStream errorRatePerZipCode localhost 44444
   */
+// scalastyle:on
 object PageViewStream {
   def main(args: Array[String]) {
     if (args.length != 3) {

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/b69f8b2a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaInputDStream.scala
----------------------------------------------------------------------
diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaInputDStream.scala b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaInputDStream.scala
index a2cd49c..c2d9dcb 100644
--- a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaInputDStream.scala
+++ b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaInputDStream.scala
@@ -37,7 +37,8 @@ import org.apache.spark.streaming.dstream._
 /**
  * Input stream that pulls messages from a Kafka Broker.
  *
- * @param kafkaParams Map of kafka configuration paramaters. See: http://kafka.apache.org/configuration.html
+ * @param kafkaParams Map of kafka configuration paramaters.
+ *                    See: http://kafka.apache.org/configuration.html
  * @param topics Map of (topic_name -> numPartitions) to consume. Each partition is consumed
  * in its own thread.
  * @param storageLevel RDD storage level.
@@ -134,12 +135,15 @@ class KafkaReceiver[
     }
   }
 
-  // It is our responsibility to delete the consumer group when specifying autooffset.reset. This is because
-  // Kafka 0.7.2 only honors this param when the group is not in zookeeper.
+  // It is our responsibility to delete the consumer group when specifying autooffset.reset. This
+  // is because Kafka 0.7.2 only honors this param when the group is not in zookeeper.
   //
-  // The kafka high level consumer doesn't expose setting offsets currently, this is a trick copied from Kafkas'
-  // ConsoleConsumer. See code related to 'autooffset.reset' when it is set to 'smallest'/'largest':
+  // The kafka high level consumer doesn't expose setting offsets currently, this is a trick copied
+  // from Kafkas' ConsoleConsumer. See code related to 'autooffset.reset' when it is set to
+  // 'smallest'/'largest':
+  // scalastyle:off
   // https://github.com/apache/kafka/blob/0.7.2/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala
+  // scalastyle:on
   private def tryZookeeperConsumerGroupCleanup(zkUrl: String, groupId: String) {
     try {
       val dir = "/consumers/" + groupId

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/b69f8b2a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala
----------------------------------------------------------------------
diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala
index 15a2daa..5472d0c 100644
--- a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala
+++ b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala
@@ -113,7 +113,8 @@ object KafkaUtils {
     ): JavaPairDStream[String, String] = {
     implicit val cmt: ClassTag[String] =
       implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[String]]
-    createStream(jssc.ssc, zkQuorum, groupId, Map(topics.mapValues(_.intValue()).toSeq: _*), storageLevel)
+    createStream(jssc.ssc, zkQuorum, groupId, Map(topics.mapValues(_.intValue()).toSeq: _*),
+      storageLevel)
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/b69f8b2a/external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQReceiver.scala
----------------------------------------------------------------------
diff --git a/external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQReceiver.scala b/external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQReceiver.scala
index 960c6a3..6acba25 100644
--- a/external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQReceiver.scala
+++ b/external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQReceiver.scala
@@ -34,8 +34,8 @@ private[streaming] class ZeroMQReceiver[T: ClassTag](publisherUrl: String,
   bytesToObjects: Seq[ByteString] => Iterator[T])
   extends Actor with Receiver with Logging {
 
-  override def preStart() = ZeroMQExtension(context.system).newSocket(SocketType.Sub, Listener(self),
-    Connect(publisherUrl), subscribe)
+  override def preStart() = ZeroMQExtension(context.system)
+    .newSocket(SocketType.Sub, Listener(self), Connect(publisherUrl), subscribe)
 
   def receive: Receive = {
 


Mime
View raw message