spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From r...@apache.org
Subject spark git commit: [SPARK-6428] Turn on explicit type checking for public methods.
Date Fri, 03 Apr 2015 08:25:05 GMT
Repository: spark
Updated Branches:
  refs/heads/master c42c3fc7f -> 82701ee25


[SPARK-6428] Turn on explicit type checking for public methods.

This builds on my earlier pull requests and turns on the explicit type checking in scalastyle.

Author: Reynold Xin <rxin@databricks.com>

Closes #5342 from rxin/SPARK-6428 and squashes the following commits:

7b531ab [Reynold Xin] import ordering
2d9a8a5 [Reynold Xin] jl
e668b1c [Reynold Xin] override
9b9e119 [Reynold Xin] Parenthesis.
82e0cf5 [Reynold Xin] [SPARK-6428] Turn on explicit type checking for public methods.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/82701ee2
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/82701ee2
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/82701ee2

Branch: refs/heads/master
Commit: 82701ee25fda64f03899713bc56f82ca6f278151
Parents: c42c3fc
Author: Reynold Xin <rxin@databricks.com>
Authored: Fri Apr 3 01:25:02 2015 -0700
Committer: Reynold Xin <rxin@databricks.com>
Committed: Fri Apr 3 01:25:02 2015 -0700

----------------------------------------------------------------------
 .../org/apache/spark/api/java/JavaPairRDD.scala |  2 +-
 .../org/apache/spark/api/java/JavaRDD.scala     |  2 +-
 .../org/apache/spark/api/java/JavaRDDLike.scala | 53 +++++++++++------
 .../org/apache/spark/examples/LocalKMeans.scala |  4 +-
 .../org/apache/spark/examples/LocalLR.scala     |  4 +-
 .../org/apache/spark/examples/LogQuery.scala    |  4 +-
 .../org/apache/spark/examples/SparkLR.scala     |  4 +-
 .../org/apache/spark/examples/SparkTC.scala     |  2 +-
 .../spark/examples/bagel/PageRankUtils.scala    |  2 +-
 .../spark/examples/mllib/MovieLensALS.scala     |  4 +-
 .../examples/streaming/ActorWordCount.scala     |  6 +-
 .../streaming/RecoverableNetworkWordCount.scala |  3 +-
 .../examples/streaming/ZeroMQWordCount.scala    |  6 +-
 .../clickstream/PageViewGenerator.scala         |  2 +-
 .../streaming/flume/FlumeInputDStream.scala     | 12 ++--
 .../kafka/DirectKafkaInputDStream.scala         |  5 +-
 .../apache/spark/streaming/kafka/KafkaRDD.scala |  4 +-
 .../streaming/twitter/TwitterInputDStream.scala |  2 +-
 .../spark/streaming/zeromq/ZeroMQReceiver.scala | 13 ++--
 .../org/apache/spark/graphx/EdgeContext.scala   |  3 +-
 .../org/apache/spark/graphx/EdgeDirection.scala | 12 ++--
 .../org/apache/spark/graphx/EdgeTriplet.scala   |  2 +-
 .../spark/graphx/impl/EdgePartition.scala       | 14 ++---
 .../apache/spark/graphx/impl/EdgeRDDImpl.scala  |  4 +-
 .../graphx/impl/ReplicatedVertexView.scala      |  2 +-
 .../spark/graphx/impl/VertexRDDImpl.scala       |  4 +-
 .../spark/graphx/lib/ConnectedComponents.scala  |  2 +-
 .../spark/graphx/lib/LabelPropagation.scala     |  4 +-
 .../org/apache/spark/graphx/lib/PageRank.scala  |  2 +-
 .../GraphXPrimitiveKeyOpenHashMap.scala         |  8 +--
 .../apache/spark/mllib/feature/Word2Vec.scala   |  2 +-
 scalastyle-config.xml                           |  2 +-
 .../apache/spark/sql/AnalysisException.scala    |  2 +-
 .../spark/sql/catalyst/analysis/package.scala   |  2 +-
 .../org/apache/spark/sql/sources/ddl.scala      |  2 +-
 .../spark/sql/hive/HiveMetastoreCatalog.scala   |  2 +-
 .../org/apache/spark/sql/hive/HiveQl.scala      |  3 +-
 .../org/apache/spark/streaming/Checkpoint.scala |  2 +-
 .../streaming/api/java/JavaDStreamLike.scala    | 12 ++--
 .../streaming/api/java/JavaPairDStream.scala    |  2 +-
 .../api/java/JavaStreamingContext.scala         | 10 ++--
 .../spark/streaming/dstream/DStream.scala       |  2 +-
 .../tools/JavaAPICompletenessChecker.scala      |  4 +-
 .../apache/spark/tools/StoragePerfTester.scala  |  6 +-
 .../spark/deploy/yarn/ApplicationMaster.scala   | 62 +++++++++++---------
 .../spark/deploy/yarn/ExecutorRunnable.scala    |  6 +-
 46 files changed, 170 insertions(+), 142 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/82701ee2/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala b/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala
index a023712..8441bb3 100644
--- a/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala
+++ b/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala
@@ -661,7 +661,7 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])
    */
   def flatMapValues[U](f: JFunction[V, java.lang.Iterable[U]]): JavaPairRDD[K, U] = {
     import scala.collection.JavaConverters._
-    def fn = (x: V) => f.call(x).asScala
+    def fn: (V) => Iterable[U] = (x: V) => f.call(x).asScala
     implicit val ctag: ClassTag[U] = fakeClassTag
     fromRDD(rdd.flatMapValues(fn))
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/82701ee2/core/src/main/scala/org/apache/spark/api/java/JavaRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaRDD.scala b/core/src/main/scala/org/apache/spark/api/java/JavaRDD.scala
index 18ccd62..db4e996 100644
--- a/core/src/main/scala/org/apache/spark/api/java/JavaRDD.scala
+++ b/core/src/main/scala/org/apache/spark/api/java/JavaRDD.scala
@@ -192,7 +192,7 @@ class JavaRDD[T](val rdd: RDD[T])(implicit val classTag: ClassTag[T])
    */
   def sortBy[S](f: JFunction[T, S], ascending: Boolean, numPartitions: Int): JavaRDD[T] = {
     import scala.collection.JavaConverters._
-    def fn = (x: T) => f.call(x)
+    def fn: (T) => S = (x: T) => f.call(x)
     import com.google.common.collect.Ordering  // shadows scala.math.Ordering
     implicit val ordering = Ordering.natural().asInstanceOf[Ordering[S]]
     implicit val ctag: ClassTag[S] = fakeClassTag

http://git-wip-us.apache.org/repos/asf/spark/blob/82701ee2/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala b/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala
index 8da4293..8bf0627 100644
--- a/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala
+++ b/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala
@@ -17,8 +17,9 @@
 
 package org.apache.spark.api.java
 
-import java.util.{Comparator, List => JList, Iterator => JIterator}
+import java.{lang => jl}
 import java.lang.{Iterable => JIterable, Long => JLong}
+import java.util.{Comparator, List => JList, Iterator => JIterator}
 
 import scala.collection.JavaConversions._
 import scala.collection.JavaConverters._
@@ -93,7 +94,7 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
    * of the original partition.
    */
   def mapPartitionsWithIndex[R](
-      f: JFunction2[java.lang.Integer, java.util.Iterator[T], java.util.Iterator[R]],
+      f: JFunction2[jl.Integer, java.util.Iterator[T], java.util.Iterator[R]],
       preservesPartitioning: Boolean = false): JavaRDD[R] =
     new JavaRDD(rdd.mapPartitionsWithIndex(((a,b) => f(a,asJavaIterator(b))),
         preservesPartitioning)(fakeClassTag))(fakeClassTag)
@@ -109,7 +110,7 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
    * Return a new RDD by applying a function to all elements of this RDD.
    */
   def mapToPair[K2, V2](f: PairFunction[T, K2, V2]): JavaPairRDD[K2, V2] = {
-    def cm = implicitly[ClassTag[(K2, V2)]]
+    def cm: ClassTag[(K2, V2)] = implicitly[ClassTag[(K2, V2)]]
     new JavaPairRDD(rdd.map[(K2, V2)](f)(cm))(fakeClassTag[K2], fakeClassTag[V2])
   }
 
@@ -119,7 +120,7 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
    */
   def flatMap[U](f: FlatMapFunction[T, U]): JavaRDD[U] = {
     import scala.collection.JavaConverters._
-    def fn = (x: T) => f.call(x).asScala
+    def fn: (T) => Iterable[U] = (x: T) => f.call(x).asScala
     JavaRDD.fromRDD(rdd.flatMap(fn)(fakeClassTag[U]))(fakeClassTag[U])
   }
 
@@ -129,8 +130,8 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
    */
   def flatMapToDouble(f: DoubleFlatMapFunction[T]): JavaDoubleRDD = {
     import scala.collection.JavaConverters._
-    def fn = (x: T) => f.call(x).asScala
-    new JavaDoubleRDD(rdd.flatMap(fn).map((x: java.lang.Double) => x.doubleValue()))
+    def fn: (T) => Iterable[jl.Double] = (x: T) => f.call(x).asScala
+    new JavaDoubleRDD(rdd.flatMap(fn).map((x: jl.Double) => x.doubleValue()))
   }
 
   /**
@@ -139,8 +140,8 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
    */
   def flatMapToPair[K2, V2](f: PairFlatMapFunction[T, K2, V2]): JavaPairRDD[K2, V2] = {
     import scala.collection.JavaConverters._
-    def fn = (x: T) => f.call(x).asScala
-    def cm = implicitly[ClassTag[(K2, V2)]]
+    def fn: (T) => Iterable[(K2, V2)] = (x: T) => f.call(x).asScala
+    def cm: ClassTag[(K2, V2)] = implicitly[ClassTag[(K2, V2)]]
     JavaPairRDD.fromRDD(rdd.flatMap(fn)(cm))(fakeClassTag[K2], fakeClassTag[V2])
   }
 
@@ -148,7 +149,9 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
    * Return a new RDD by applying a function to each partition of this RDD.
    */
   def mapPartitions[U](f: FlatMapFunction[java.util.Iterator[T], U]): JavaRDD[U] = {
-    def fn = (x: Iterator[T]) => asScalaIterator(f.call(asJavaIterator(x)).iterator())
+    def fn: (Iterator[T]) => Iterator[U] = {
+      (x: Iterator[T]) => asScalaIterator(f.call(asJavaIterator(x)).iterator())
+    }
     JavaRDD.fromRDD(rdd.mapPartitions(fn)(fakeClassTag[U]))(fakeClassTag[U])
   }
 
@@ -157,7 +160,9 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
    */
   def mapPartitions[U](f: FlatMapFunction[java.util.Iterator[T], U],
       preservesPartitioning: Boolean): JavaRDD[U] = {
-    def fn = (x: Iterator[T]) => asScalaIterator(f.call(asJavaIterator(x)).iterator())
+    def fn: (Iterator[T]) => Iterator[U] = {
+      (x: Iterator[T]) => asScalaIterator(f.call(asJavaIterator(x)).iterator())
+    }
     JavaRDD.fromRDD(
       rdd.mapPartitions(fn, preservesPartitioning)(fakeClassTag[U]))(fakeClassTag[U])
   }
@@ -166,8 +171,10 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
    * Return a new RDD by applying a function to each partition of this RDD.
    */
   def mapPartitionsToDouble(f: DoubleFlatMapFunction[java.util.Iterator[T]]): JavaDoubleRDD = {
-    def fn = (x: Iterator[T]) => asScalaIterator(f.call(asJavaIterator(x)).iterator())
-    new JavaDoubleRDD(rdd.mapPartitions(fn).map((x: java.lang.Double) => x.doubleValue()))
+    def fn: (Iterator[T]) => Iterator[jl.Double] = {
+      (x: Iterator[T]) => asScalaIterator(f.call(asJavaIterator(x)).iterator())
+    }
+    new JavaDoubleRDD(rdd.mapPartitions(fn).map((x: jl.Double) => x.doubleValue()))
   }
 
   /**
@@ -175,7 +182,9 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
    */
   def mapPartitionsToPair[K2, V2](f: PairFlatMapFunction[java.util.Iterator[T], K2, V2]):
   JavaPairRDD[K2, V2] = {
-    def fn = (x: Iterator[T]) => asScalaIterator(f.call(asJavaIterator(x)).iterator())
+    def fn: (Iterator[T]) => Iterator[(K2, V2)] = {
+      (x: Iterator[T]) => asScalaIterator(f.call(asJavaIterator(x)).iterator())
+    }
     JavaPairRDD.fromRDD(rdd.mapPartitions(fn))(fakeClassTag[K2], fakeClassTag[V2])
   }
 
@@ -184,7 +193,9 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
    */
   def mapPartitionsToDouble(f: DoubleFlatMapFunction[java.util.Iterator[T]],
       preservesPartitioning: Boolean): JavaDoubleRDD = {
-    def fn = (x: Iterator[T]) => asScalaIterator(f.call(asJavaIterator(x)).iterator())
+    def fn: (Iterator[T]) => Iterator[jl.Double] = {
+      (x: Iterator[T]) => asScalaIterator(f.call(asJavaIterator(x)).iterator())
+    }
     new JavaDoubleRDD(rdd.mapPartitions(fn, preservesPartitioning)
       .map(x => x.doubleValue()))
   }
@@ -194,7 +205,9 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
    */
   def mapPartitionsToPair[K2, V2](f: PairFlatMapFunction[java.util.Iterator[T], K2, V2],
       preservesPartitioning: Boolean): JavaPairRDD[K2, V2] = {
-    def fn = (x: Iterator[T]) => asScalaIterator(f.call(asJavaIterator(x)).iterator())
+    def fn: (Iterator[T]) => Iterator[(K2, V2)] = {
+      (x: Iterator[T]) => asScalaIterator(f.call(asJavaIterator(x)).iterator())
+    }
     JavaPairRDD.fromRDD(
       rdd.mapPartitions(fn, preservesPartitioning))(fakeClassTag[K2], fakeClassTag[V2])
   }
@@ -277,8 +290,10 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
   def zipPartitions[U, V](
       other: JavaRDDLike[U, _],
       f: FlatMapFunction2[java.util.Iterator[T], java.util.Iterator[U], V]): JavaRDD[V] = {
-    def fn = (x: Iterator[T], y: Iterator[U]) => asScalaIterator(
-      f.call(asJavaIterator(x), asJavaIterator(y)).iterator())
+    def fn: (Iterator[T], Iterator[U]) => Iterator[V] = {
+      (x: Iterator[T], y: Iterator[U]) => asScalaIterator(
+        f.call(asJavaIterator(x), asJavaIterator(y)).iterator())
+    }
     JavaRDD.fromRDD(
       rdd.zipPartitions(other.rdd)(fn)(other.classTag, fakeClassTag[V]))(fakeClassTag[V])
   }
@@ -441,8 +456,8 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
    * Return the count of each unique value in this RDD as a map of (value, count) pairs. The final
    * combine step happens locally on the master, equivalent to running a single reduce task.
    */
-  def countByValue(): java.util.Map[T, java.lang.Long] =
-    mapAsSerializableJavaMap(rdd.countByValue().map((x => (x._1, new java.lang.Long(x._2)))))
+  def countByValue(): java.util.Map[T, jl.Long] =
+    mapAsSerializableJavaMap(rdd.countByValue().map((x => (x._1, new jl.Long(x._2)))))
 
   /**
    * (Experimental) Approximate version of countByValue().

http://git-wip-us.apache.org/repos/asf/spark/blob/82701ee2/examples/src/main/scala/org/apache/spark/examples/LocalKMeans.scala
----------------------------------------------------------------------
diff --git a/examples/src/main/scala/org/apache/spark/examples/LocalKMeans.scala b/examples/src/main/scala/org/apache/spark/examples/LocalKMeans.scala
index 17624c2..f73eac1 100644
--- a/examples/src/main/scala/org/apache/spark/examples/LocalKMeans.scala
+++ b/examples/src/main/scala/org/apache/spark/examples/LocalKMeans.scala
@@ -40,8 +40,8 @@ object LocalKMeans {
   val convergeDist = 0.001
   val rand = new Random(42)
 
-  def generateData = {
-    def generatePoint(i: Int) = {
+  def generateData: Array[DenseVector[Double]] = {
+    def generatePoint(i: Int): DenseVector[Double] = {
       DenseVector.fill(D){rand.nextDouble * R}
     }
     Array.tabulate(N)(generatePoint)

http://git-wip-us.apache.org/repos/asf/spark/blob/82701ee2/examples/src/main/scala/org/apache/spark/examples/LocalLR.scala
----------------------------------------------------------------------
diff --git a/examples/src/main/scala/org/apache/spark/examples/LocalLR.scala b/examples/src/main/scala/org/apache/spark/examples/LocalLR.scala
index 92a683a..a55e0dc 100644
--- a/examples/src/main/scala/org/apache/spark/examples/LocalLR.scala
+++ b/examples/src/main/scala/org/apache/spark/examples/LocalLR.scala
@@ -37,8 +37,8 @@ object LocalLR {
 
   case class DataPoint(x: Vector[Double], y: Double)
 
-  def generateData = {
-    def generatePoint(i: Int) = {
+  def generateData: Array[DataPoint] = {
+    def generatePoint(i: Int): DataPoint = {
       val y = if(i % 2 == 0) -1 else 1
       val x = DenseVector.fill(D){rand.nextGaussian + y * R}
       DataPoint(x, y)

http://git-wip-us.apache.org/repos/asf/spark/blob/82701ee2/examples/src/main/scala/org/apache/spark/examples/LogQuery.scala
----------------------------------------------------------------------
diff --git a/examples/src/main/scala/org/apache/spark/examples/LogQuery.scala b/examples/src/main/scala/org/apache/spark/examples/LogQuery.scala
index 74620ad..32e02ea 100644
--- a/examples/src/main/scala/org/apache/spark/examples/LogQuery.scala
+++ b/examples/src/main/scala/org/apache/spark/examples/LogQuery.scala
@@ -54,8 +54,8 @@ object LogQuery {
     // scalastyle:on
     /** Tracks the total query count and number of aggregate bytes for a particular group. */
     class Stats(val count: Int, val numBytes: Int) extends Serializable {
-      def merge(other: Stats) = new Stats(count + other.count, numBytes + other.numBytes)
-      override def toString = "bytes=%s\tn=%s".format(numBytes, count)
+      def merge(other: Stats): Stats = new Stats(count + other.count, numBytes + other.numBytes)
+      override def toString: String = "bytes=%s\tn=%s".format(numBytes, count)
     }
 
     def extractKey(line: String): (String, String, String) = {

http://git-wip-us.apache.org/repos/asf/spark/blob/82701ee2/examples/src/main/scala/org/apache/spark/examples/SparkLR.scala
----------------------------------------------------------------------
diff --git a/examples/src/main/scala/org/apache/spark/examples/SparkLR.scala b/examples/src/main/scala/org/apache/spark/examples/SparkLR.scala
index 257a7d2..8c01a60 100644
--- a/examples/src/main/scala/org/apache/spark/examples/SparkLR.scala
+++ b/examples/src/main/scala/org/apache/spark/examples/SparkLR.scala
@@ -42,8 +42,8 @@ object SparkLR {
 
   case class DataPoint(x: Vector[Double], y: Double)
 
-  def generateData = {
-    def generatePoint(i: Int) = {
+  def generateData: Array[DataPoint] = {
+    def generatePoint(i: Int): DataPoint = {
       val y = if(i % 2 == 0) -1 else 1
       val x = DenseVector.fill(D){rand.nextGaussian + y * R}
       DataPoint(x, y)

http://git-wip-us.apache.org/repos/asf/spark/blob/82701ee2/examples/src/main/scala/org/apache/spark/examples/SparkTC.scala
----------------------------------------------------------------------
diff --git a/examples/src/main/scala/org/apache/spark/examples/SparkTC.scala b/examples/src/main/scala/org/apache/spark/examples/SparkTC.scala
index f7f8308..772cd89 100644
--- a/examples/src/main/scala/org/apache/spark/examples/SparkTC.scala
+++ b/examples/src/main/scala/org/apache/spark/examples/SparkTC.scala
@@ -31,7 +31,7 @@ object SparkTC {
   val numVertices = 100
   val rand = new Random(42)
 
-  def generateGraph = {
+  def generateGraph: Seq[(Int, Int)] = {
     val edges: mutable.Set[(Int, Int)] = mutable.Set.empty
     while (edges.size < numEdges) {
       val from = rand.nextInt(numVertices)

http://git-wip-us.apache.org/repos/asf/spark/blob/82701ee2/examples/src/main/scala/org/apache/spark/examples/bagel/PageRankUtils.scala
----------------------------------------------------------------------
diff --git a/examples/src/main/scala/org/apache/spark/examples/bagel/PageRankUtils.scala b/examples/src/main/scala/org/apache/spark/examples/bagel/PageRankUtils.scala
index e322d4c..ab6e63d 100644
--- a/examples/src/main/scala/org/apache/spark/examples/bagel/PageRankUtils.scala
+++ b/examples/src/main/scala/org/apache/spark/examples/bagel/PageRankUtils.scala
@@ -90,7 +90,7 @@ class PRMessage() extends Message[String] with Serializable {
 }
 
 class CustomPartitioner(partitions: Int) extends Partitioner {
-  def numPartitions = partitions
+  def numPartitions: Int = partitions
 
   def getPartition(key: Any): Int = {
     val hash = key match {

http://git-wip-us.apache.org/repos/asf/spark/blob/82701ee2/examples/src/main/scala/org/apache/spark/examples/mllib/MovieLensALS.scala
----------------------------------------------------------------------
diff --git a/examples/src/main/scala/org/apache/spark/examples/mllib/MovieLensALS.scala b/examples/src/main/scala/org/apache/spark/examples/mllib/MovieLensALS.scala
index 1f4ca4f..0bc36ea 100644
--- a/examples/src/main/scala/org/apache/spark/examples/mllib/MovieLensALS.scala
+++ b/examples/src/main/scala/org/apache/spark/examples/mllib/MovieLensALS.scala
@@ -178,7 +178,9 @@ object MovieLensALS {
   def computeRmse(model: MatrixFactorizationModel, data: RDD[Rating], implicitPrefs: Boolean)
     : Double = {
 
-    def mapPredictedRating(r: Double) = if (implicitPrefs) math.max(math.min(r, 1.0), 0.0) else r
+    def mapPredictedRating(r: Double): Double = {
+      if (implicitPrefs) math.max(math.min(r, 1.0), 0.0) else r
+    }
 
     val predictions: RDD[Rating] = model.predict(data.map(x => (x.user, x.product)))
     val predictionsAndRatings = predictions.map{ x =>

http://git-wip-us.apache.org/repos/asf/spark/blob/82701ee2/examples/src/main/scala/org/apache/spark/examples/streaming/ActorWordCount.scala
----------------------------------------------------------------------
diff --git a/examples/src/main/scala/org/apache/spark/examples/streaming/ActorWordCount.scala b/examples/src/main/scala/org/apache/spark/examples/streaming/ActorWordCount.scala
index b433082..92867b4 100644
--- a/examples/src/main/scala/org/apache/spark/examples/streaming/ActorWordCount.scala
+++ b/examples/src/main/scala/org/apache/spark/examples/streaming/ActorWordCount.scala
@@ -85,13 +85,13 @@ extends Actor with ActorHelper {
 
   lazy private val remotePublisher = context.actorSelection(urlOfPublisher)
 
-  override def preStart = remotePublisher ! SubscribeReceiver(context.self)
+  override def preStart(): Unit = remotePublisher ! SubscribeReceiver(context.self)
 
-  def receive = {
+  def receive: PartialFunction[Any, Unit] = {
     case msg => store(msg.asInstanceOf[T])
   }
 
-  override def postStop() = remotePublisher ! UnsubscribeReceiver(context.self)
+  override def postStop(): Unit = remotePublisher ! UnsubscribeReceiver(context.self)
 
 }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/82701ee2/examples/src/main/scala/org/apache/spark/examples/streaming/RecoverableNetworkWordCount.scala
----------------------------------------------------------------------
diff --git a/examples/src/main/scala/org/apache/spark/examples/streaming/RecoverableNetworkWordCount.scala b/examples/src/main/scala/org/apache/spark/examples/streaming/RecoverableNetworkWordCount.scala
index c3a05c8..751b30e 100644
--- a/examples/src/main/scala/org/apache/spark/examples/streaming/RecoverableNetworkWordCount.scala
+++ b/examples/src/main/scala/org/apache/spark/examples/streaming/RecoverableNetworkWordCount.scala
@@ -55,7 +55,8 @@ import org.apache.spark.util.IntParam
  */
 object RecoverableNetworkWordCount {
 
-  def createContext(ip: String, port: Int, outputPath: String, checkpointDirectory: String) = {
+  def createContext(ip: String, port: Int, outputPath: String, checkpointDirectory: String)
+    : StreamingContext = {
 
     // If you do not see this printed, that means the StreamingContext has been loaded
     // from the new checkpoint

http://git-wip-us.apache.org/repos/asf/spark/blob/82701ee2/examples/src/main/scala/org/apache/spark/examples/streaming/ZeroMQWordCount.scala
----------------------------------------------------------------------
diff --git a/examples/src/main/scala/org/apache/spark/examples/streaming/ZeroMQWordCount.scala b/examples/src/main/scala/org/apache/spark/examples/streaming/ZeroMQWordCount.scala
index 6510c70..e99d1ba 100644
--- a/examples/src/main/scala/org/apache/spark/examples/streaming/ZeroMQWordCount.scala
+++ b/examples/src/main/scala/org/apache/spark/examples/streaming/ZeroMQWordCount.scala
@@ -35,7 +35,7 @@ import org.apache.spark.SparkConf
  */
 object SimpleZeroMQPublisher {
 
-  def main(args: Array[String]) = {
+  def main(args: Array[String]): Unit = {
     if (args.length < 2) {
       System.err.println("Usage: SimpleZeroMQPublisher <zeroMQUrl> <topic> ")
       System.exit(1)
@@ -45,7 +45,7 @@ object SimpleZeroMQPublisher {
     val acs: ActorSystem = ActorSystem()
 
     val pubSocket = ZeroMQExtension(acs).newSocket(SocketType.Pub, Bind(url))
-    implicit def stringToByteString(x: String) = ByteString(x)
+    implicit def stringToByteString(x: String): ByteString = ByteString(x)
     val messages: List[ByteString] = List("words ", "may ", "count ")
     while (true) {
       Thread.sleep(1000)
@@ -86,7 +86,7 @@ object ZeroMQWordCount {
     // Create the context and set the batch size
     val ssc = new StreamingContext(sparkConf, Seconds(2))
 
-    def bytesToStringIterator(x: Seq[ByteString]) = (x.map(_.utf8String)).iterator
+    def bytesToStringIterator(x: Seq[ByteString]): Iterator[String] = x.map(_.utf8String).iterator
 
     // For this stream, a zeroMQ publisher should be running.
     val lines = ZeroMQUtils.createStream(ssc, url, Subscribe(topic), bytesToStringIterator _)

http://git-wip-us.apache.org/repos/asf/spark/blob/82701ee2/examples/src/main/scala/org/apache/spark/examples/streaming/clickstream/PageViewGenerator.scala
----------------------------------------------------------------------
diff --git a/examples/src/main/scala/org/apache/spark/examples/streaming/clickstream/PageViewGenerator.scala b/examples/src/main/scala/org/apache/spark/examples/streaming/clickstream/PageViewGenerator.scala
index 8402491..54d996b 100644
--- a/examples/src/main/scala/org/apache/spark/examples/streaming/clickstream/PageViewGenerator.scala
+++ b/examples/src/main/scala/org/apache/spark/examples/streaming/clickstream/PageViewGenerator.scala
@@ -94,7 +94,7 @@ object PageViewGenerator {
     while (true) {
       val socket = listener.accept()
       new Thread() {
-        override def run = {
+        override def run(): Unit = {
           println("Got client connected from: " + socket.getInetAddress)
           val out = new PrintWriter(socket.getOutputStream(), true)
 

http://git-wip-us.apache.org/repos/asf/spark/blob/82701ee2/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeInputDStream.scala
----------------------------------------------------------------------
diff --git a/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeInputDStream.scala b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeInputDStream.scala
index 2de2a79..60e2994 100644
--- a/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeInputDStream.scala
+++ b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeInputDStream.scala
@@ -37,8 +37,7 @@ import org.apache.spark.streaming.dstream._
 import org.apache.spark.streaming.StreamingContext
 import org.apache.spark.streaming.receiver.Receiver
 
-import org.jboss.netty.channel.ChannelPipelineFactory
-import org.jboss.netty.channel.Channels
+import org.jboss.netty.channel.{ChannelPipeline, ChannelPipelineFactory, Channels}
 import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory
 import org.jboss.netty.handler.codec.compression._
 
@@ -187,8 +186,8 @@ class FlumeReceiver(
     logInfo("Flume receiver stopped")
   }
 
-  override def preferredLocation = Some(host)
-  
+  override def preferredLocation: Option[String] = Option(host)
+
   /** A Netty Pipeline factory that will decompress incoming data from 
     * and the Netty client and compress data going back to the client.
     *
@@ -198,13 +197,12 @@ class FlumeReceiver(
     */
   private[streaming]
   class CompressionChannelPipelineFactory extends ChannelPipelineFactory {
-
-    def getPipeline() = {
+    def getPipeline(): ChannelPipeline = {
       val pipeline = Channels.pipeline()
       val encoder = new ZlibEncoder(6)
       pipeline.addFirst("deflater", encoder)
       pipeline.addFirst("inflater", new ZlibDecoder())
       pipeline
+    }
   }
 }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/82701ee2/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala
----------------------------------------------------------------------
diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala
index 04e65cb..1b1fc80 100644
--- a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala
+++ b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala
@@ -129,8 +129,9 @@ class DirectKafkaInputDStream[
 
   private[streaming]
   class DirectKafkaInputDStreamCheckpointData extends DStreamCheckpointData(this) {
-    def batchForTime = data.asInstanceOf[mutable.HashMap[
-      Time, Array[OffsetRange.OffsetRangeTuple]]]
+    def batchForTime: mutable.HashMap[Time, Array[(String, Int, Long, Long)]] = {
+      data.asInstanceOf[mutable.HashMap[Time, Array[OffsetRange.OffsetRangeTuple]]]
+    }
 
     override def update(time: Time) {
       batchForTime.clear()

http://git-wip-us.apache.org/repos/asf/spark/blob/82701ee2/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaRDD.scala
----------------------------------------------------------------------
diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaRDD.scala b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaRDD.scala
index 6d465bc..4a83b71 100644
--- a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaRDD.scala
+++ b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaRDD.scala
@@ -155,7 +155,7 @@ class KafkaRDD[
         .dropWhile(_.offset < requestOffset)
     }
 
-    override def close() = consumer.close()
+    override def close(): Unit = consumer.close()
 
     override def getNext(): R = {
       if (iter == null || !iter.hasNext) {
@@ -207,7 +207,7 @@ object KafkaRDD {
       fromOffsets: Map[TopicAndPartition, Long],
       untilOffsets: Map[TopicAndPartition, LeaderOffset],
       messageHandler: MessageAndMetadata[K, V] => R
-  ): KafkaRDD[K, V, U, T, R] = {
+    ): KafkaRDD[K, V, U, T, R] = {
     val leaders = untilOffsets.map { case (tp, lo) =>
         tp -> (lo.host, lo.port)
     }.toMap

http://git-wip-us.apache.org/repos/asf/spark/blob/82701ee2/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterInputDStream.scala
----------------------------------------------------------------------
diff --git a/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterInputDStream.scala b/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterInputDStream.scala
index 4eacc47..7cf02d8 100644
--- a/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterInputDStream.scala
+++ b/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterInputDStream.scala
@@ -70,7 +70,7 @@ class TwitterReceiver(
     try {
       val newTwitterStream = new TwitterStreamFactory().getInstance(twitterAuth)
       newTwitterStream.addListener(new StatusListener {
-        def onStatus(status: Status) = {
+        def onStatus(status: Status): Unit = {
           store(status)
         }
         // Unimplemented

http://git-wip-us.apache.org/repos/asf/spark/blob/82701ee2/external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQReceiver.scala
----------------------------------------------------------------------
diff --git a/external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQReceiver.scala b/external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQReceiver.scala
index 5547058..588e6ba 100644
--- a/external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQReceiver.scala
+++ b/external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQReceiver.scala
@@ -29,13 +29,16 @@ import org.apache.spark.streaming.receiver.ActorHelper
 /**
  * A receiver to subscribe to ZeroMQ stream.
  */
-private[streaming] class ZeroMQReceiver[T: ClassTag](publisherUrl: String,
-  subscribe: Subscribe,
-  bytesToObjects: Seq[ByteString] => Iterator[T])
+private[streaming] class ZeroMQReceiver[T: ClassTag](
+    publisherUrl: String,
+    subscribe: Subscribe,
+    bytesToObjects: Seq[ByteString] => Iterator[T])
   extends Actor with ActorHelper with Logging {
 
-  override def preStart() = ZeroMQExtension(context.system)
-    .newSocket(SocketType.Sub, Listener(self), Connect(publisherUrl), subscribe)
+  override def preStart(): Unit = {
+    ZeroMQExtension(context.system)
+      .newSocket(SocketType.Sub, Listener(self), Connect(publisherUrl), subscribe)
+  }
 
   def receive: Receive = {
 

http://git-wip-us.apache.org/repos/asf/spark/blob/82701ee2/graphx/src/main/scala/org/apache/spark/graphx/EdgeContext.scala
----------------------------------------------------------------------
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/EdgeContext.scala b/graphx/src/main/scala/org/apache/spark/graphx/EdgeContext.scala
index d8be02e..2343017 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/EdgeContext.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/EdgeContext.scala
@@ -62,7 +62,6 @@ object EdgeContext {
    *    , _ + _)
    * }}}
    */
-  def unapply[VD, ED, A](edge: EdgeContext[VD, ED, A]) =
+  def unapply[VD, ED, A](edge: EdgeContext[VD, ED, A]): Some[(VertexId, VertexId, VD, VD, ED)] =
     Some(edge.srcId, edge.dstId, edge.srcAttr, edge.dstAttr, edge.attr)
 }
-

http://git-wip-us.apache.org/repos/asf/spark/blob/82701ee2/graphx/src/main/scala/org/apache/spark/graphx/EdgeDirection.scala
----------------------------------------------------------------------
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/EdgeDirection.scala b/graphx/src/main/scala/org/apache/spark/graphx/EdgeDirection.scala
index 6f03eb1..058c8c8 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/EdgeDirection.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/EdgeDirection.scala
@@ -34,12 +34,12 @@ class EdgeDirection private (private val name: String) extends Serializable {
 
   override def toString: String = "EdgeDirection." + name
 
-  override def equals(o: Any) = o match {
+  override def equals(o: Any): Boolean = o match {
     case other: EdgeDirection => other.name == name
     case _ => false
   }
 
-  override def hashCode = name.hashCode
+  override def hashCode: Int = name.hashCode
 }
 
 
@@ -48,14 +48,14 @@ class EdgeDirection private (private val name: String) extends Serializable {
  */
 object EdgeDirection {
   /** Edges arriving at a vertex. */
-  final val In = new EdgeDirection("In")
+  final val In: EdgeDirection = new EdgeDirection("In")
 
   /** Edges originating from a vertex. */
-  final val Out = new EdgeDirection("Out")
+  final val Out: EdgeDirection = new EdgeDirection("Out")
 
   /** Edges originating from *or* arriving at a vertex of interest. */
-  final val Either = new EdgeDirection("Either")
+  final val Either: EdgeDirection = new EdgeDirection("Either")
 
   /** Edges originating from *and* arriving at a vertex of interest. */
-  final val Both = new EdgeDirection("Both")
+  final val Both: EdgeDirection = new EdgeDirection("Both")
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/82701ee2/graphx/src/main/scala/org/apache/spark/graphx/EdgeTriplet.scala
----------------------------------------------------------------------
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/EdgeTriplet.scala b/graphx/src/main/scala/org/apache/spark/graphx/EdgeTriplet.scala
index 9d473d5..c8790ca 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/EdgeTriplet.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/EdgeTriplet.scala
@@ -62,7 +62,7 @@ class EdgeTriplet[VD, ED] extends Edge[ED] {
   def vertexAttr(vid: VertexId): VD =
     if (srcId == vid) srcAttr else { assert(dstId == vid); dstAttr }
 
-  override def toString = ((srcId, srcAttr), (dstId, dstAttr), attr).toString()
+  override def toString: String = ((srcId, srcAttr), (dstId, dstAttr), attr).toString()
 
   def toTuple: ((VertexId, VD), (VertexId, VD), ED) = ((srcId, srcAttr), (dstId, dstAttr), attr)
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/82701ee2/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartition.scala
----------------------------------------------------------------------
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartition.scala b/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartition.scala
index 373af75..c561570 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartition.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartition.scala
@@ -324,7 +324,7 @@ class EdgePartition[
    *
    * @return an iterator over edges in the partition
    */
-  def iterator = new Iterator[Edge[ED]] {
+  def iterator: Iterator[Edge[ED]] = new Iterator[Edge[ED]] {
     private[this] val edge = new Edge[ED]
     private[this] var pos = 0
 
@@ -351,7 +351,7 @@ class EdgePartition[
 
     override def hasNext: Boolean = pos < EdgePartition.this.size
 
-    override def next() = {
+    override def next(): EdgeTriplet[VD, ED] = {
       val triplet = new EdgeTriplet[VD, ED]
       val localSrcId = localSrcIds(pos)
       val localDstId = localDstIds(pos)
@@ -518,11 +518,11 @@ private class AggregatingEdgeContext[VD, ED, A](
     _attr = attr
   }
 
-  override def srcId = _srcId
-  override def dstId = _dstId
-  override def srcAttr = _srcAttr
-  override def dstAttr = _dstAttr
-  override def attr = _attr
+  override def srcId: VertexId = _srcId
+  override def dstId: VertexId = _dstId
+  override def srcAttr: VD = _srcAttr
+  override def dstAttr: VD = _dstAttr
+  override def attr: ED = _attr
 
   override def sendToSrc(msg: A) {
     send(_localSrcId, msg)

http://git-wip-us.apache.org/repos/asf/spark/blob/82701ee2/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgeRDDImpl.scala
----------------------------------------------------------------------
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgeRDDImpl.scala b/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgeRDDImpl.scala
index 43a3aea..c88b2f6 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgeRDDImpl.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgeRDDImpl.scala
@@ -70,9 +70,9 @@ class EdgeRDDImpl[ED: ClassTag, VD: ClassTag] private[graphx] (
     this
   }
 
-  override def getStorageLevel = partitionsRDD.getStorageLevel
+  override def getStorageLevel: StorageLevel = partitionsRDD.getStorageLevel
 
-  override def checkpoint() = {
+  override def checkpoint(): Unit = {
     partitionsRDD.checkpoint()
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/82701ee2/graphx/src/main/scala/org/apache/spark/graphx/impl/ReplicatedVertexView.scala
----------------------------------------------------------------------
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/impl/ReplicatedVertexView.scala b/graphx/src/main/scala/org/apache/spark/graphx/impl/ReplicatedVertexView.scala
index 8ab255b..1df8644 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/impl/ReplicatedVertexView.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/ReplicatedVertexView.scala
@@ -50,7 +50,7 @@ class ReplicatedVertexView[VD: ClassTag, ED: ClassTag](
    * Return a new `ReplicatedVertexView` where edges are reversed and shipping levels are swapped to
    * match.
    */
-  def reverse() = {
+  def reverse(): ReplicatedVertexView[VD, ED] = {
     val newEdges = edges.mapEdgePartitions((pid, part) => part.reverse)
     new ReplicatedVertexView(newEdges, hasDstId, hasSrcId)
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/82701ee2/graphx/src/main/scala/org/apache/spark/graphx/impl/VertexRDDImpl.scala
----------------------------------------------------------------------
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/impl/VertexRDDImpl.scala b/graphx/src/main/scala/org/apache/spark/graphx/impl/VertexRDDImpl.scala
index 349c854..33ac7b0 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/impl/VertexRDDImpl.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/VertexRDDImpl.scala
@@ -71,9 +71,9 @@ class VertexRDDImpl[VD] private[graphx] (
     this
   }
 
-  override def getStorageLevel = partitionsRDD.getStorageLevel
+  override def getStorageLevel: StorageLevel = partitionsRDD.getStorageLevel
 
-  override def checkpoint() = {
+  override def checkpoint(): Unit = {
     partitionsRDD.checkpoint()
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/82701ee2/graphx/src/main/scala/org/apache/spark/graphx/lib/ConnectedComponents.scala
----------------------------------------------------------------------
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/lib/ConnectedComponents.scala b/graphx/src/main/scala/org/apache/spark/graphx/lib/ConnectedComponents.scala
index e2f6cc1..859f896 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/lib/ConnectedComponents.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/lib/ConnectedComponents.scala
@@ -37,7 +37,7 @@ object ConnectedComponents {
    */
   def run[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED]): Graph[VertexId, ED] = {
     val ccGraph = graph.mapVertices { case (vid, _) => vid }
-    def sendMessage(edge: EdgeTriplet[VertexId, ED]) = {
+    def sendMessage(edge: EdgeTriplet[VertexId, ED]): Iterator[(VertexId, VertexId)] = {
       if (edge.srcAttr < edge.dstAttr) {
         Iterator((edge.dstId, edge.srcAttr))
       } else if (edge.srcAttr > edge.dstAttr) {

http://git-wip-us.apache.org/repos/asf/spark/blob/82701ee2/graphx/src/main/scala/org/apache/spark/graphx/lib/LabelPropagation.scala
----------------------------------------------------------------------
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/lib/LabelPropagation.scala b/graphx/src/main/scala/org/apache/spark/graphx/lib/LabelPropagation.scala
index 82e9e06..2bcf868 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/lib/LabelPropagation.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/lib/LabelPropagation.scala
@@ -43,7 +43,7 @@ object LabelPropagation {
    */
   def run[VD, ED: ClassTag](graph: Graph[VD, ED], maxSteps: Int): Graph[VertexId, ED] = {
     val lpaGraph = graph.mapVertices { case (vid, _) => vid }
-    def sendMessage(e: EdgeTriplet[VertexId, ED]) = {
+    def sendMessage(e: EdgeTriplet[VertexId, ED]): Iterator[(VertexId, Map[VertexId, VertexId])] = {
       Iterator((e.srcId, Map(e.dstAttr -> 1L)), (e.dstId, Map(e.srcAttr -> 1L)))
     }
     def mergeMessage(count1: Map[VertexId, Long], count2: Map[VertexId, Long])
@@ -54,7 +54,7 @@ object LabelPropagation {
         i -> (count1Val + count2Val)
       }.toMap
     }
-    def vertexProgram(vid: VertexId, attr: Long, message: Map[VertexId, Long]) = {
+    def vertexProgram(vid: VertexId, attr: Long, message: Map[VertexId, Long]): VertexId = {
       if (message.isEmpty) attr else message.maxBy(_._2)._1
     }
     val initialMessage = Map[VertexId, Long]()

http://git-wip-us.apache.org/repos/asf/spark/blob/82701ee2/graphx/src/main/scala/org/apache/spark/graphx/lib/PageRank.scala
----------------------------------------------------------------------
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/lib/PageRank.scala b/graphx/src/main/scala/org/apache/spark/graphx/lib/PageRank.scala
index 570440b..042e366 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/lib/PageRank.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/lib/PageRank.scala
@@ -156,7 +156,7 @@ object PageRank extends Logging {
       (newPR, newPR - oldPR)
     }
 
-    def sendMessage(edge: EdgeTriplet[(Double, Double), Double]) = {
+    def sendMessage(edge: EdgeTriplet[(Double, Double), Double]): Iterator[(VertexId, Double)] = {
       if (edge.srcAttr._2 > tol) {
         Iterator((edge.dstId, edge.srcAttr._2 * edge.attr))
       } else {

http://git-wip-us.apache.org/repos/asf/spark/blob/82701ee2/graphx/src/main/scala/org/apache/spark/graphx/util/collection/GraphXPrimitiveKeyOpenHashMap.scala
----------------------------------------------------------------------
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/util/collection/GraphXPrimitiveKeyOpenHashMap.scala b/graphx/src/main/scala/org/apache/spark/graphx/util/collection/GraphXPrimitiveKeyOpenHashMap.scala
index 57b01b6..e2754ea 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/util/collection/GraphXPrimitiveKeyOpenHashMap.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/util/collection/GraphXPrimitiveKeyOpenHashMap.scala
@@ -56,7 +56,7 @@ class GraphXPrimitiveKeyOpenHashMap[@specialized(Long, Int) K: ClassTag,
 
   private var _oldValues: Array[V] = null
 
-  override def size = keySet.size
+  override def size: Int = keySet.size
 
   /** Get the value for a given key */
   def apply(k: K): V = {
@@ -112,7 +112,7 @@ class GraphXPrimitiveKeyOpenHashMap[@specialized(Long, Int) K: ClassTag,
     }
   }
 
-  override def iterator = new Iterator[(K, V)] {
+  override def iterator: Iterator[(K, V)] = new Iterator[(K, V)] {
     var pos = 0
     var nextPair: (K, V) = computeNextPair()
 
@@ -128,9 +128,9 @@ class GraphXPrimitiveKeyOpenHashMap[@specialized(Long, Int) K: ClassTag,
       }
     }
 
-    def hasNext = nextPair != null
+    def hasNext: Boolean = nextPair != null
 
-    def next() = {
+    def next(): (K, V) = {
       val pair = nextPair
       nextPair = computeNextPair()
       pair

http://git-wip-us.apache.org/repos/asf/spark/blob/82701ee2/mllib/src/main/scala/org/apache/spark/mllib/feature/Word2Vec.scala
----------------------------------------------------------------------
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/feature/Word2Vec.scala b/mllib/src/main/scala/org/apache/spark/mllib/feature/Word2Vec.scala
index 9ee7e4a..b2d9053 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/feature/Word2Vec.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/feature/Word2Vec.scala
@@ -522,7 +522,7 @@ object Word2VecModel extends Loader[Word2VecModel] {
       new Word2VecModel(word2VecMap)
     }
 
-    def save(sc: SparkContext, path: String, model: Map[String, Array[Float]]) = {
+    def save(sc: SparkContext, path: String, model: Map[String, Array[Float]]): Unit = {
 
       val sqlContext = new SQLContext(sc)
       import sqlContext.implicits._

http://git-wip-us.apache.org/repos/asf/spark/blob/82701ee2/scalastyle-config.xml
----------------------------------------------------------------------
diff --git a/scalastyle-config.xml b/scalastyle-config.xml
index 459a503..7168d5b 100644
--- a/scalastyle-config.xml
+++ b/scalastyle-config.xml
@@ -137,7 +137,7 @@
  <!--   <parameter name="maxMethods"><![CDATA[30]]></parameter> -->
  <!--  </parameters> -->
  <!-- </check> -->
- <check level="error" class="org.scalastyle.scalariform.PublicMethodsHaveTypeChecker" enabled="false"></check>
+ <check level="error" class="org.scalastyle.scalariform.PublicMethodsHaveTypeChecker" enabled="true"></check>
  <check level="error" class="org.scalastyle.file.NewLineAtEofChecker" enabled="true"></check>
  <check level="error" class="org.scalastyle.file.NoNewLineAtEofChecker" enabled="false"></check>
  <check level="error" class="org.scalastyle.scalariform.NonASCIICharacterChecker" enabled="true"></check>

http://git-wip-us.apache.org/repos/asf/spark/blob/82701ee2/sql/catalyst/src/main/scala/org/apache/spark/sql/AnalysisException.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/AnalysisException.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/AnalysisException.scala
index 34fedea..f999218 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/AnalysisException.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/AnalysisException.scala
@@ -30,7 +30,7 @@ class AnalysisException protected[sql] (
     val startPosition: Option[Int] = None)
   extends Exception with Serializable {
 
-  def withPosition(line: Option[Int], startPosition: Option[Int]) = {
+  def withPosition(line: Option[Int], startPosition: Option[Int]): AnalysisException = {
     val newException = new AnalysisException(message, line, startPosition)
     newException.setStackTrace(getStackTrace)
     newException

http://git-wip-us.apache.org/repos/asf/spark/blob/82701ee2/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/package.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/package.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/package.scala
index c61c395..7731336 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/package.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/package.scala
@@ -44,7 +44,7 @@ package object analysis {
   }
 
   /** Catches any AnalysisExceptions thrown by `f` and attaches `t`'s position if any. */
-  def withPosition[A](t: TreeNode[_])(f: => A) = {
+  def withPosition[A](t: TreeNode[_])(f: => A): A = {
     try f catch {
       case a: AnalysisException =>
         throw a.withPosition(t.origin.line, t.origin.startPosition)

http://git-wip-us.apache.org/repos/asf/spark/blob/82701ee2/sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala
index eb46b46..319de71 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala
@@ -204,7 +204,7 @@ private[sql] object ResolvedDataSource {
       provider: String,
       options: Map[String, String]): ResolvedDataSource = {
     val clazz: Class[_] = lookupDataSource(provider)
-    def className = clazz.getCanonicalName
+    def className: String = clazz.getCanonicalName
     val relation = userSpecifiedSchema match {
       case Some(schema: StructType) => clazz.newInstance() match {
         case dataSource: SchemaRelationProvider =>

http://git-wip-us.apache.org/repos/asf/spark/blob/82701ee2/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
index c4da34a..ae5ce4c 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
@@ -861,7 +861,7 @@ private[hive] case class MetastoreRelation
   /** An attribute map for determining the ordinal for non-partition columns. */
   val columnOrdinals = AttributeMap(attributes.zipWithIndex)
 
-  override def newInstance() = {
+  override def newInstance(): MetastoreRelation = {
     MetastoreRelation(databaseName, tableName, alias)(table, partitions)(sqlContext)
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/82701ee2/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala
index 5be09a1..077e641 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala
@@ -659,7 +659,8 @@ https://cwiki.apache.org/confluence/display/Hive/Enhanced+Aggregation%2C+Cube%2C
                   AttributeReference("value", StringType)()), true)
             }
 
-            def matchSerDe(clause: Seq[ASTNode]) = clause match {
+            def matchSerDe(clause: Seq[ASTNode])
+              : (Seq[(String, String)], String, Seq[(String, String)]) = clause match {
               case Token("TOK_SERDEPROPS", propsClause) :: Nil =>
                 val rowFormat = propsClause.map {
                   case Token(name, Token(value, Nil) :: Nil) => (name, value)

http://git-wip-us.apache.org/repos/asf/spark/blob/82701ee2/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
index f73b463..28703ef 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
@@ -234,7 +234,7 @@ object CheckpointReader extends Logging {
     val checkpointPath = new Path(checkpointDir)
 
     // TODO(rxin): Why is this a def?!
-    def fs = checkpointPath.getFileSystem(hadoopConf)
+    def fs: FileSystem = checkpointPath.getFileSystem(hadoopConf)
 
     // Try to find the checkpoint files
     val checkpointFiles = Checkpoint.getCheckpointFiles(checkpointDir, fs).reverse

http://git-wip-us.apache.org/repos/asf/spark/blob/82701ee2/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala
index 73030e1..808dcc1 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala
@@ -169,7 +169,7 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This, R], R <: JavaRDDLike[T
    */
   def flatMap[U](f: FlatMapFunction[T, U]): JavaDStream[U] = {
     import scala.collection.JavaConverters._
-    def fn = (x: T) => f.call(x).asScala
+    def fn: (T) => Iterable[U] = (x: T) => f.call(x).asScala
     new JavaDStream(dstream.flatMap(fn)(fakeClassTag[U]))(fakeClassTag[U])
   }
 
@@ -179,7 +179,7 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This, R], R <: JavaRDDLike[T
    */
   def flatMapToPair[K2, V2](f: PairFlatMapFunction[T, K2, V2]): JavaPairDStream[K2, V2] = {
     import scala.collection.JavaConverters._
-    def fn = (x: T) => f.call(x).asScala
+    def fn: (T) => Iterable[(K2, V2)] = (x: T) => f.call(x).asScala
     def cm: ClassTag[(K2, V2)] = fakeClassTag
     new JavaPairDStream(dstream.flatMap(fn)(cm))(fakeClassTag[K2], fakeClassTag[V2])
   }
@@ -190,7 +190,9 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This, R], R <: JavaRDDLike[T
    * of the RDD.
    */
   def mapPartitions[U](f: FlatMapFunction[java.util.Iterator[T], U]): JavaDStream[U] = {
-    def fn = (x: Iterator[T]) => asScalaIterator(f.call(asJavaIterator(x)).iterator())
+    def fn: (Iterator[T]) => Iterator[U] = {
+      (x: Iterator[T]) => asScalaIterator(f.call(asJavaIterator(x)).iterator())
+    }
     new JavaDStream(dstream.mapPartitions(fn)(fakeClassTag[U]))(fakeClassTag[U])
   }
 
@@ -201,7 +203,9 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This, R], R <: JavaRDDLike[T
    */
   def mapPartitionsToPair[K2, V2](f: PairFlatMapFunction[java.util.Iterator[T], K2, V2])
   : JavaPairDStream[K2, V2] = {
-    def fn = (x: Iterator[T]) => asScalaIterator(f.call(asJavaIterator(x)).iterator())
+    def fn: (Iterator[T]) => Iterator[(K2, V2)] = {
+      (x: Iterator[T]) => asScalaIterator(f.call(asJavaIterator(x)).iterator())
+    }
     new JavaPairDStream(dstream.mapPartitions(fn))(fakeClassTag[K2], fakeClassTag[V2])
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/82701ee2/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala
index f94f2d0..93baad1 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala
@@ -526,7 +526,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
    */
   def flatMapValues[U](f: JFunction[V, java.lang.Iterable[U]]): JavaPairDStream[K, U] = {
     import scala.collection.JavaConverters._
-    def fn = (x: V) => f.apply(x).asScala
+    def fn: (V) => Iterable[U] = (x: V) => f.apply(x).asScala
     implicit val cm: ClassTag[U] =
       implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[U]]
     dstream.flatMapValues(fn)

http://git-wip-us.apache.org/repos/asf/spark/blob/82701ee2/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala
index e3db01c..4095a7c 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala
@@ -192,7 +192,7 @@ class JavaStreamingContext(val ssc: StreamingContext) extends Closeable {
       converter: JFunction[InputStream, java.lang.Iterable[T]],
       storageLevel: StorageLevel)
   : JavaReceiverInputDStream[T] = {
-    def fn = (x: InputStream) => converter.call(x).toIterator
+    def fn: (InputStream) => Iterator[T] = (x: InputStream) => converter.call(x).toIterator
     implicit val cmt: ClassTag[T] =
       implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]]
     ssc.socketStream(hostname, port, fn, storageLevel)
@@ -313,7 +313,7 @@ class JavaStreamingContext(val ssc: StreamingContext) extends Closeable {
     implicit val cmk: ClassTag[K] = ClassTag(kClass)
     implicit val cmv: ClassTag[V] = ClassTag(vClass)
     implicit val cmf: ClassTag[F] = ClassTag(fClass)
-    def fn = (x: Path) => filter.call(x).booleanValue()
+    def fn: (Path) => Boolean = (x: Path) => filter.call(x).booleanValue()
     ssc.fileStream[K, V, F](directory, fn, newFilesOnly)
   }
 
@@ -344,7 +344,7 @@ class JavaStreamingContext(val ssc: StreamingContext) extends Closeable {
     implicit val cmk: ClassTag[K] = ClassTag(kClass)
     implicit val cmv: ClassTag[V] = ClassTag(vClass)
     implicit val cmf: ClassTag[F] = ClassTag(fClass)
-    def fn = (x: Path) => filter.call(x).booleanValue()
+    def fn: (Path) => Boolean = (x: Path) => filter.call(x).booleanValue()
     ssc.fileStream[K, V, F](directory, fn, newFilesOnly, conf)
   }
 
@@ -625,7 +625,7 @@ class JavaStreamingContext(val ssc: StreamingContext) extends Closeable {
    * Stop the execution of the streams.
    * @param stopSparkContext Stop the associated SparkContext or not
    */
-  def stop(stopSparkContext: Boolean) = ssc.stop(stopSparkContext)
+  def stop(stopSparkContext: Boolean): Unit = ssc.stop(stopSparkContext)
 
   /**
    * Stop the execution of the streams.
@@ -633,7 +633,7 @@ class JavaStreamingContext(val ssc: StreamingContext) extends Closeable {
    * @param stopGracefully Stop gracefully by waiting for the processing of all
    *                       received data to be completed
    */
-  def stop(stopSparkContext: Boolean, stopGracefully: Boolean) = {
+  def stop(stopSparkContext: Boolean, stopGracefully: Boolean): Unit = {
     ssc.stop(stopSparkContext, stopGracefully)
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/82701ee2/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala
index 795c5aa..24f99a2 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala
@@ -839,7 +839,7 @@ object DStream {
 
     /** Filtering function that excludes non-user classes for a streaming application */
     def streamingExclustionFunction(className: String): Boolean = {
-      def doesMatch(r: Regex) = r.findFirstIn(className).isDefined
+      def doesMatch(r: Regex): Boolean = r.findFirstIn(className).isDefined
       val isSparkClass = doesMatch(SPARK_CLASS_REGEX)
       val isSparkExampleClass = doesMatch(SPARK_EXAMPLES_CLASS_REGEX)
       val isSparkStreamingTestClass = doesMatch(SPARK_STREAMING_TESTCLASS_REGEX)

http://git-wip-us.apache.org/repos/asf/spark/blob/82701ee2/tools/src/main/scala/org/apache/spark/tools/JavaAPICompletenessChecker.scala
----------------------------------------------------------------------
diff --git a/tools/src/main/scala/org/apache/spark/tools/JavaAPICompletenessChecker.scala b/tools/src/main/scala/org/apache/spark/tools/JavaAPICompletenessChecker.scala
index 8d0f099..583823c 100644
--- a/tools/src/main/scala/org/apache/spark/tools/JavaAPICompletenessChecker.scala
+++ b/tools/src/main/scala/org/apache/spark/tools/JavaAPICompletenessChecker.scala
@@ -17,7 +17,7 @@
 
 package org.apache.spark.tools
 
-import java.lang.reflect.Method
+import java.lang.reflect.{Type, Method}
 
 import scala.collection.mutable.ArrayBuffer
 import scala.language.existentials
@@ -302,7 +302,7 @@ object JavaAPICompletenessChecker {
   private def isExcludedByInterface(method: Method): Boolean = {
     val excludedInterfaces =
       Set("org.apache.spark.Logging", "org.apache.hadoop.mapreduce.HadoopMapReduceUtil")
-    def toComparisionKey(method: Method) =
+    def toComparisionKey(method: Method): (Class[_], String, Type) =
       (method.getReturnType, method.getName, method.getGenericReturnType)
     val interfaces = method.getDeclaringClass.getInterfaces.filter { i =>
       excludedInterfaces.contains(i.getName)

http://git-wip-us.apache.org/repos/asf/spark/blob/82701ee2/tools/src/main/scala/org/apache/spark/tools/StoragePerfTester.scala
----------------------------------------------------------------------
diff --git a/tools/src/main/scala/org/apache/spark/tools/StoragePerfTester.scala b/tools/src/main/scala/org/apache/spark/tools/StoragePerfTester.scala
index 6b666a0..f2d1353 100644
--- a/tools/src/main/scala/org/apache/spark/tools/StoragePerfTester.scala
+++ b/tools/src/main/scala/org/apache/spark/tools/StoragePerfTester.scala
@@ -32,7 +32,7 @@ import org.apache.spark.util.Utils
  * Writes simulated shuffle output from several threads and records the observed throughput.
  */
 object StoragePerfTester {
-  def main(args: Array[String]) = {
+  def main(args: Array[String]): Unit = {
     /** Total amount of data to generate. Distributed evenly amongst maps and reduce splits. */
     val dataSizeMb = Utils.memoryStringToMb(sys.env.getOrElse("OUTPUT_DATA", "1g"))
 
@@ -58,7 +58,7 @@ object StoragePerfTester {
     val sc = new SparkContext("local[4]", "Write Tester", conf)
     val hashShuffleManager = sc.env.shuffleManager.asInstanceOf[HashShuffleManager]
 
-    def writeOutputBytes(mapId: Int, total: AtomicLong) = {
+    def writeOutputBytes(mapId: Int, total: AtomicLong): Unit = {
       val shuffle = hashShuffleManager.shuffleBlockResolver.forMapTask(1, mapId, numOutputSplits,
         new KryoSerializer(sc.conf), new ShuffleWriteMetrics())
       val writers = shuffle.writers
@@ -78,7 +78,7 @@ object StoragePerfTester {
     val totalBytes = new AtomicLong()
     for (task <- 1 to numMaps) {
       executor.submit(new Runnable() {
-        override def run() = {
+        override def run(): Unit = {
           try {
             writeOutputBytes(task, totalBytes)
             latch.countDown()

http://git-wip-us.apache.org/repos/asf/spark/blob/82701ee2/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
----------------------------------------------------------------------
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
index 3d18690..455554e 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
@@ -162,7 +162,7 @@ private[spark] class ApplicationMaster(
    * status to SUCCEEDED in cluster mode to handle if the user calls System.exit
    * from the application code.
    */
-  final def getDefaultFinalStatus() = {
+  final def getDefaultFinalStatus(): FinalApplicationStatus = {
     if (isClusterMode) {
       FinalApplicationStatus.SUCCEEDED
     } else {
@@ -175,31 +175,35 @@ private[spark] class ApplicationMaster(
    * This means the ResourceManager will not retry the application attempt on your behalf if
    * a failure occurred.
    */
-  final def unregister(status: FinalApplicationStatus, diagnostics: String = null) = synchronized {
-    if (!unregistered) {
-      logInfo(s"Unregistering ApplicationMaster with $status" +
-        Option(diagnostics).map(msg => s" (diag message: $msg)").getOrElse(""))
-      unregistered = true
-      client.unregister(status, Option(diagnostics).getOrElse(""))
+  final def unregister(status: FinalApplicationStatus, diagnostics: String = null): Unit = {
+    synchronized {
+      if (!unregistered) {
+        logInfo(s"Unregistering ApplicationMaster with $status" +
+          Option(diagnostics).map(msg => s" (diag message: $msg)").getOrElse(""))
+        unregistered = true
+        client.unregister(status, Option(diagnostics).getOrElse(""))
+      }
     }
   }
 
-  final def finish(status: FinalApplicationStatus, code: Int, msg: String = null) = synchronized {
-    if (!finished) {
-      val inShutdown = Utils.inShutdown()
-      logInfo(s"Final app status: ${status}, exitCode: ${code}" +
-        Option(msg).map(msg => s", (reason: $msg)").getOrElse(""))
-      exitCode = code
-      finalStatus = status
-      finalMsg = msg
-      finished = true
-      if (!inShutdown && Thread.currentThread() != reporterThread && reporterThread != null) {
-        logDebug("shutting down reporter thread")
-        reporterThread.interrupt()
-      }
-      if (!inShutdown && Thread.currentThread() != userClassThread && userClassThread != null) {
-        logDebug("shutting down user thread")
-        userClassThread.interrupt()
+  final def finish(status: FinalApplicationStatus, code: Int, msg: String = null): Unit = {
+    synchronized {
+      if (!finished) {
+        val inShutdown = Utils.inShutdown()
+        logInfo(s"Final app status: $status, exitCode: $code" +
+          Option(msg).map(msg => s", (reason: $msg)").getOrElse(""))
+        exitCode = code
+        finalStatus = status
+        finalMsg = msg
+        finished = true
+        if (!inShutdown && Thread.currentThread() != reporterThread && reporterThread != null) {
+          logDebug("shutting down reporter thread")
+          reporterThread.interrupt()
+        }
+        if (!inShutdown && Thread.currentThread() != userClassThread && userClassThread != null) {
+          logDebug("shutting down user thread")
+          userClassThread.interrupt()
+        }
       }
     }
   }
@@ -506,7 +510,7 @@ private[spark] class ApplicationMaster(
   private class AMActor(driverUrl: String, isClusterMode: Boolean) extends Actor {
     var driver: ActorSelection = _
 
-    override def preStart() = {
+    override def preStart(): Unit = {
       logInfo("Listen to driver: " + driverUrl)
       driver = context.actorSelection(driverUrl)
       // Send a hello message to establish the connection, after which
@@ -520,7 +524,7 @@ private[spark] class ApplicationMaster(
       }
     }
 
-    override def receive = {
+    override def receive: PartialFunction[Any, Unit] = {
       case x: DisassociatedEvent =>
         logInfo(s"Driver terminated or disconnected! Shutting down. $x")
         // In cluster mode, do not rely on the disassociated event to exit
@@ -567,7 +571,7 @@ object ApplicationMaster extends Logging {
 
   private var master: ApplicationMaster = _
 
-  def main(args: Array[String]) = {
+  def main(args: Array[String]): Unit = {
     SignalLogger.register(log)
     val amArgs = new ApplicationMasterArguments(args)
     SparkHadoopUtil.get.runAsSparkUser { () =>
@@ -576,11 +580,11 @@ object ApplicationMaster extends Logging {
     }
   }
 
-  private[spark] def sparkContextInitialized(sc: SparkContext) = {
+  private[spark] def sparkContextInitialized(sc: SparkContext): Unit = {
     master.sparkContextInitialized(sc)
   }
 
-  private[spark] def sparkContextStopped(sc: SparkContext) = {
+  private[spark] def sparkContextStopped(sc: SparkContext): Boolean = {
     master.sparkContextStopped(sc)
   }
 
@@ -592,7 +596,7 @@ object ApplicationMaster extends Logging {
  */
 object ExecutorLauncher {
 
-  def main(args: Array[String]) = {
+  def main(args: Array[String]): Unit = {
     ApplicationMaster.main(args)
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/82701ee2/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala
----------------------------------------------------------------------
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala
index c1d3f73..1ce10d9 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala
@@ -59,15 +59,15 @@ class ExecutorRunnable(
   val yarnConf: YarnConfiguration = new YarnConfiguration(conf)
   lazy val env = prepareEnvironment(container)
 
-  def run = {
+  override def run(): Unit = {
     logInfo("Starting Executor Container")
     nmClient = NMClient.createNMClient()
     nmClient.init(yarnConf)
     nmClient.start()
-    startContainer
+    startContainer()
   }
 
-  def startContainer = {
+  def startContainer(): java.util.Map[String, ByteBuffer] = {
     logInfo("Setting up ContainerLaunchContext")
 
     val ctx = Records.newRecord(classOf[ContainerLaunchContext])


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message