spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From r...@apache.org
Subject [2/2] spark git commit: [SPARK-6428][SQL] Added explicit type for all public methods in sql/core
Date Fri, 20 Mar 2015 22:48:43 GMT
[SPARK-6428][SQL] Added explicit type for all public methods in sql/core

Also implemented equals/hashCode when they are missing.

This is done in order to enable automatic public method type checking.

Author: Reynold Xin <rxin@databricks.com>

Closes #5104 from rxin/sql-hashcode-explicittype and squashes the following commits:

ffce6f3 [Reynold Xin] Code review feedback.
8b36733 [Reynold Xin] [SPARK-6428][SQL] Added explicit type for all public methods.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/a95043b1
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/a95043b1
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/a95043b1

Branch: refs/heads/master
Commit: a95043b1780bfde556db2dcc01511e40a12498dd
Parents: 257cde7
Author: Reynold Xin <rxin@databricks.com>
Authored: Fri Mar 20 15:47:07 2015 -0700
Committer: Reynold Xin <rxin@databricks.com>
Committed: Fri Mar 20 15:47:07 2015 -0700

----------------------------------------------------------------------
 .../sql/catalyst/expressions/AttributeSet.scala |  3 +-
 .../spark/sql/catalyst/expressions/rows.scala   | 21 +++++
 .../org/apache/spark/sql/types/Decimal.scala    |  2 +-
 .../org/apache/spark/sql/types/dataTypes.scala  | 20 ++---
 .../scala/org/apache/spark/sql/Column.scala     |  2 +-
 .../scala/org/apache/spark/sql/DataFrame.scala  |  6 +-
 .../scala/org/apache/spark/sql/SQLContext.scala |  8 +-
 .../org/apache/spark/sql/UDFRegistration.scala  |  2 +-
 .../spark/sql/columnar/ColumnAccessor.scala     |  4 +-
 .../spark/sql/columnar/ColumnBuilder.scala      |  4 +-
 .../apache/spark/sql/columnar/ColumnStats.scala | 24 +++---
 .../apache/spark/sql/columnar/ColumnType.scala  | 56 +++++++-------
 .../columnar/InMemoryColumnarTableScan.scala    | 56 ++++++++------
 .../sql/columnar/NullableColumnAccessor.scala   |  2 +-
 .../CompressibleColumnAccessor.scala            |  4 +-
 .../compression/CompressibleColumnBuilder.scala |  2 +-
 .../compression/compressionSchemes.scala        | 80 +++++++++++---------
 .../apache/spark/sql/execution/Aggregate.scala  |  8 +-
 .../apache/spark/sql/execution/Exchange.scala   | 19 ++---
 .../spark/sql/execution/ExistingRDD.scala       | 20 ++---
 .../org/apache/spark/sql/execution/Expand.scala |  5 +-
 .../apache/spark/sql/execution/Generate.scala   |  3 +-
 .../sql/execution/GeneratedAggregate.scala      | 11 +--
 .../spark/sql/execution/LocalTableScan.scala    |  7 +-
 .../apache/spark/sql/execution/SparkPlan.scala  |  1 +
 .../sql/execution/SparkSqlSerializer.scala      |  2 +-
 .../spark/sql/execution/SparkStrategies.scala   |  6 +-
 .../spark/sql/execution/basicOperators.scala    | 73 +++++++++---------
 .../apache/spark/sql/execution/commands.scala   | 33 ++++----
 .../spark/sql/execution/debug/package.scala     | 30 ++++----
 .../sql/execution/joins/BroadcastHashJoin.scala | 10 ++-
 .../joins/BroadcastLeftSemiJoinHash.scala       | 10 +--
 .../joins/BroadcastNestedLoopJoin.scala         |  5 +-
 .../sql/execution/joins/CartesianProduct.scala  |  8 +-
 .../spark/sql/execution/joins/HashJoin.scala    |  4 +-
 .../sql/execution/joins/HashOuterJoin.scala     | 53 ++++++-------
 .../sql/execution/joins/HashedRelation.scala    |  4 +-
 .../sql/execution/joins/LeftSemiJoinBNL.scala   | 10 ++-
 .../sql/execution/joins/LeftSemiJoinHash.scala  | 11 +--
 .../sql/execution/joins/ShuffledHashJoin.scala  |  6 +-
 .../apache/spark/sql/execution/pythonUdfs.scala | 21 +++--
 .../org/apache/spark/sql/jdbc/JDBCRDD.scala     |  3 +-
 .../apache/spark/sql/jdbc/JDBCRelation.scala    |  8 +-
 .../apache/spark/sql/json/JSONRelation.scala    |  6 +-
 .../spark/sql/parquet/ParquetConverter.scala    |  2 +-
 .../spark/sql/parquet/ParquetRelation.scala     | 10 ++-
 .../sql/parquet/ParquetTableOperations.scala    | 12 +--
 .../apache/spark/sql/parquet/newParquet.scala   | 37 ++++++---
 .../spark/sql/parquet/timestamp/NanoTime.scala  |  6 +-
 .../spark/sql/sources/LogicalRelation.scala     | 16 ++--
 .../org/apache/spark/sql/sources/commands.scala |  2 +-
 .../org/apache/spark/sql/sources/ddl.scala      |  6 +-
 .../org/apache/spark/sql/sources/rules.scala    |  4 +-
 53 files changed, 438 insertions(+), 330 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/a95043b1/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/AttributeSet.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/AttributeSet.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/AttributeSet.scala
index a9ba0be..adaeab0 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/AttributeSet.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/AttributeSet.scala
@@ -17,7 +17,6 @@
 
 package org.apache.spark.sql.catalyst.expressions
 
-import org.apache.spark.sql.catalyst.analysis.Star
 
 protected class AttributeEquals(val a: Attribute) {
   override def hashCode() = a match {
@@ -115,7 +114,7 @@ class AttributeSet private (val baseSet: Set[AttributeEquals])
   // sorts of things in its closure.
   override def toSeq: Seq[Attribute] = baseSet.map(_.a).toArray.toSeq
 
-  override def toString = "{" + baseSet.map(_.a).mkString(", ") + "}"
+  override def toString: String = "{" + baseSet.map(_.a).mkString(", ") + "}"
 
   override def isEmpty: Boolean = baseSet.isEmpty
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/a95043b1/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/rows.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/rows.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/rows.scala
index faa3667..f03d6f7 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/rows.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/rows.scala
@@ -146,6 +146,27 @@ class GenericRow(protected[sql] val values: Array[Any]) extends Row {
     result
   }
 
+  override def equals(o: Any): Boolean = o match {
+    case other: Row =>
+      if (values.length != other.length) {
+        return false
+      }
+
+      var i = 0
+      while (i < values.length) {
+        if (isNullAt(i) != other.isNullAt(i)) {
+          return false
+        }
+        if (apply(i) != other.apply(i)) {
+          return false
+        }
+        i += 1
+      }
+      true
+
+    case _ => false
+  }
+
   def copy() = this
 }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/a95043b1/sql/catalyst/src/main/scala/org/apache/spark/sql/types/Decimal.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/Decimal.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/Decimal.scala
index 21cc6ce..994c520 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/Decimal.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/Decimal.scala
@@ -246,7 +246,7 @@ final class Decimal extends Ordered[Decimal] with Serializable {
     }
   }
 
-  override def equals(other: Any) = other match {
+  override def equals(other: Any): Boolean = other match {
     case d: Decimal =>
       compare(d) == 0
     case _ =>

http://git-wip-us.apache.org/repos/asf/spark/blob/a95043b1/sql/catalyst/src/main/scala/org/apache/spark/sql/types/dataTypes.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/dataTypes.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/dataTypes.scala
index bf39603..d973144 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/dataTypes.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/dataTypes.scala
@@ -307,7 +307,7 @@ protected[sql] object NativeType {
 
 
 protected[sql] trait PrimitiveType extends DataType {
-  override def isPrimitive = true
+  override def isPrimitive: Boolean = true
 }
 
 
@@ -442,7 +442,7 @@ class TimestampType private() extends NativeType {
   @transient private[sql] lazy val tag = ScalaReflectionLock.synchronized { typeTag[JvmType] }
 
   private[sql] val ordering = new Ordering[JvmType] {
-    def compare(x: Timestamp, y: Timestamp) = x.compareTo(y)
+    def compare(x: Timestamp, y: Timestamp): Int = x.compareTo(y)
   }
 
   /**
@@ -542,7 +542,7 @@ class LongType private() extends IntegralType {
    */
   override def defaultSize: Int = 8
 
-  override def simpleString = "bigint"
+  override def simpleString: String = "bigint"
 
   private[spark] override def asNullable: LongType = this
 }
@@ -572,7 +572,7 @@ class IntegerType private() extends IntegralType {
    */
   override def defaultSize: Int = 4
 
-  override def simpleString = "int"
+  override def simpleString: String = "int"
 
   private[spark] override def asNullable: IntegerType = this
 }
@@ -602,7 +602,7 @@ class ShortType private() extends IntegralType {
    */
   override def defaultSize: Int = 2
 
-  override def simpleString = "smallint"
+  override def simpleString: String = "smallint"
 
   private[spark] override def asNullable: ShortType = this
 }
@@ -632,7 +632,7 @@ class ByteType private() extends IntegralType {
    */
   override def defaultSize: Int = 1
 
-  override def simpleString = "tinyint"
+  override def simpleString: String = "tinyint"
 
   private[spark] override def asNullable: ByteType = this
 }
@@ -696,7 +696,7 @@ case class DecimalType(precisionInfo: Option[PrecisionInfo]) extends FractionalT
    */
   override def defaultSize: Int = 4096
 
-  override def simpleString = precisionInfo match {
+  override def simpleString: String = precisionInfo match {
     case Some(PrecisionInfo(precision, scale)) => s"decimal($precision,$scale)"
     case None => "decimal(10,0)"
   }
@@ -836,7 +836,7 @@ case class ArrayType(elementType: DataType, containsNull: Boolean) extends DataT
    */
   override def defaultSize: Int = 100 * elementType.defaultSize
 
-  override def simpleString = s"array<${elementType.simpleString}>"
+  override def simpleString: String = s"array<${elementType.simpleString}>"
 
   private[spark] override def asNullable: ArrayType =
     ArrayType(elementType.asNullable, containsNull = true)
@@ -1065,7 +1065,7 @@ case class StructType(fields: Array[StructField]) extends DataType with Seq[Stru
    */
   override def defaultSize: Int = fields.map(_.dataType.defaultSize).sum
 
-  override def simpleString = {
+  override def simpleString: String = {
     val fieldTypes = fields.map(field => s"${field.name}:${field.dataType.simpleString}")
     s"struct<${fieldTypes.mkString(",")}>"
   }
@@ -1142,7 +1142,7 @@ case class MapType(
    */
   override def defaultSize: Int = 100 * (keyType.defaultSize + valueType.defaultSize)
 
-  override def simpleString = s"map<${keyType.simpleString},${valueType.simpleString}>"
+  override def simpleString: String = s"map<${keyType.simpleString},${valueType.simpleString}>"
 
   private[spark] override def asNullable: MapType =
     MapType(keyType.asNullable, valueType.asNullable, valueContainsNull = true)

http://git-wip-us.apache.org/repos/asf/spark/blob/a95043b1/sql/core/src/main/scala/org/apache/spark/sql/Column.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Column.scala b/sql/core/src/main/scala/org/apache/spark/sql/Column.scala
index 908c78a..b7a13a1 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/Column.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/Column.scala
@@ -59,7 +59,7 @@ class Column(protected[sql] val expr: Expression) {
 
   override def toString: String = expr.prettyString
 
-  override def equals(that: Any) = that match {
+  override def equals(that: Any): Boolean = that match {
     case that: Column => that.expr.equals(this.expr)
     case _ => false
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/a95043b1/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
index 46f5070..8b8f86c 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
@@ -33,7 +33,7 @@ import org.apache.spark.api.java.JavaRDD
 import org.apache.spark.api.python.SerDeUtil
 import org.apache.spark.rdd.RDD
 import org.apache.spark.storage.StorageLevel
-import org.apache.spark.sql.catalyst.{ScalaReflection, SqlParser}
+import org.apache.spark.sql.catalyst.{expressions, ScalaReflection, SqlParser}
 import org.apache.spark.sql.catalyst.analysis.{UnresolvedRelation, ResolvedStar}
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.plans.{JoinType, Inner}
@@ -722,7 +722,7 @@ class DataFrame private[sql](
     : DataFrame = {
     val dataType = ScalaReflection.schemaFor[B].dataType
     val attributes = AttributeReference(outputColumn, dataType)() :: Nil
-    def rowFunction(row: Row) = {
+    def rowFunction(row: Row): TraversableOnce[Row] = {
       f(row(0).asInstanceOf[A]).map(o => Row(ScalaReflection.convertToCatalyst(o, dataType)))
     }
     val generator = UserDefinedGenerator(attributes, rowFunction, apply(inputColumn).expr :: Nil)
@@ -1155,7 +1155,7 @@ class DataFrame private[sql](
       val gen = new JsonFactory().createGenerator(writer).setRootValueSeparator(null)
 
       new Iterator[String] {
-        override def hasNext = iter.hasNext
+        override def hasNext: Boolean = iter.hasNext
         override def next(): String = {
           JsonRDD.rowToJSON(rowSchema, gen)(iter.next())
           gen.flush()

http://git-wip-us.apache.org/repos/asf/spark/blob/a95043b1/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
index 6de46a5..dc9912b 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
@@ -144,7 +144,7 @@ class SQLContext(@transient val sparkContext: SparkContext)
 
   @transient
   protected[sql] val tlSession = new ThreadLocal[SQLSession]() {
-    override def initialValue = defaultSession
+    override def initialValue: SQLSession = defaultSession
   }
 
   @transient
@@ -988,9 +988,9 @@ class SQLContext(@transient val sparkContext: SparkContext)
 
     val sqlContext: SQLContext = self
 
-    def codegenEnabled = self.conf.codegenEnabled
+    def codegenEnabled: Boolean = self.conf.codegenEnabled
 
-    def numPartitions = self.conf.numShufflePartitions
+    def numPartitions: Int = self.conf.numShufflePartitions
 
     def strategies: Seq[Strategy] =
       experimental.extraStrategies ++ (
@@ -1109,7 +1109,7 @@ class SQLContext(@transient val sparkContext: SparkContext)
 
     lazy val analyzed: LogicalPlan = analyzer(logical)
     lazy val withCachedData: LogicalPlan = {
-      assertAnalyzed
+      assertAnalyzed()
       cacheManager.useCachedData(analyzed)
     }
     lazy val optimizedPlan: LogicalPlan = optimizer(withCachedData)

http://git-wip-us.apache.org/repos/asf/spark/blob/a95043b1/sql/core/src/main/scala/org/apache/spark/sql/UDFRegistration.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/UDFRegistration.scala b/sql/core/src/main/scala/org/apache/spark/sql/UDFRegistration.scala
index 8051df2..b97aaf7 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/UDFRegistration.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/UDFRegistration.scala
@@ -61,7 +61,7 @@ class UDFRegistration private[sql] (sqlContext: SQLContext) extends Logging {
 
     val dataType = sqlContext.parseDataType(stringDataType)
 
-    def builder(e: Seq[Expression]) =
+    def builder(e: Seq[Expression]): PythonUDF =
       PythonUDF(
         name,
         command,

http://git-wip-us.apache.org/repos/asf/spark/blob/a95043b1/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnAccessor.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnAccessor.scala b/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnAccessor.scala
index b615eaa..f615fb3 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnAccessor.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnAccessor.scala
@@ -48,9 +48,9 @@ private[sql] abstract class BasicColumnAccessor[T <: DataType, JvmType](
 
   protected def initialize() {}
 
-  def hasNext = buffer.hasRemaining
+  override def hasNext: Boolean = buffer.hasRemaining
 
-  def extractTo(row: MutableRow, ordinal: Int): Unit = {
+  override def extractTo(row: MutableRow, ordinal: Int): Unit = {
     extractSingle(row, ordinal)
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/a95043b1/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnBuilder.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnBuilder.scala b/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnBuilder.scala
index d8d24a5..c881747 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnBuilder.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnBuilder.scala
@@ -58,7 +58,7 @@ private[sql] class BasicColumnBuilder[T <: DataType, JvmType](
   override def initialize(
       initialSize: Int,
       columnName: String = "",
-      useCompression: Boolean = false) = {
+      useCompression: Boolean = false): Unit = {
 
     val size = if (initialSize == 0) DEFAULT_INITIAL_BUFFER_SIZE else initialSize
     this.columnName = columnName
@@ -73,7 +73,7 @@ private[sql] class BasicColumnBuilder[T <: DataType, JvmType](
     columnType.append(row, ordinal, buffer)
   }
 
-  override def build() = {
+  override def build(): ByteBuffer = {
     buffer.flip().asInstanceOf[ByteBuffer]
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/a95043b1/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnStats.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnStats.scala b/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnStats.scala
index 04047b9..87a6631 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnStats.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnStats.scala
@@ -76,7 +76,7 @@ private[sql] sealed trait ColumnStats extends Serializable {
 private[sql] class NoopColumnStats extends ColumnStats {
   override def gatherStats(row: Row, ordinal: Int): Unit = super.gatherStats(row, ordinal)
 
-  def collectedStatistics = Row(null, null, nullCount, count, 0L)
+  override def collectedStatistics: Row = Row(null, null, nullCount, count, 0L)
 }
 
 private[sql] class BooleanColumnStats extends ColumnStats {
@@ -93,7 +93,7 @@ private[sql] class BooleanColumnStats extends ColumnStats {
     }
   }
 
-  def collectedStatistics = Row(lower, upper, nullCount, count, sizeInBytes)
+  override def collectedStatistics: Row = Row(lower, upper, nullCount, count, sizeInBytes)
 }
 
 private[sql] class ByteColumnStats extends ColumnStats {
@@ -110,7 +110,7 @@ private[sql] class ByteColumnStats extends ColumnStats {
     }
   }
 
-  def collectedStatistics = Row(lower, upper, nullCount, count, sizeInBytes)
+  override def collectedStatistics: Row = Row(lower, upper, nullCount, count, sizeInBytes)
 }
 
 private[sql] class ShortColumnStats extends ColumnStats {
@@ -127,7 +127,7 @@ private[sql] class ShortColumnStats extends ColumnStats {
     }
   }
 
-  def collectedStatistics = Row(lower, upper, nullCount, count, sizeInBytes)
+  override def collectedStatistics: Row = Row(lower, upper, nullCount, count, sizeInBytes)
 }
 
 private[sql] class LongColumnStats extends ColumnStats {
@@ -144,7 +144,7 @@ private[sql] class LongColumnStats extends ColumnStats {
     }
   }
 
-  def collectedStatistics = Row(lower, upper, nullCount, count, sizeInBytes)
+  override def collectedStatistics: Row = Row(lower, upper, nullCount, count, sizeInBytes)
 }
 
 private[sql] class DoubleColumnStats extends ColumnStats {
@@ -161,7 +161,7 @@ private[sql] class DoubleColumnStats extends ColumnStats {
     }
   }
 
-  def collectedStatistics = Row(lower, upper, nullCount, count, sizeInBytes)
+  override def collectedStatistics: Row = Row(lower, upper, nullCount, count, sizeInBytes)
 }
 
 private[sql] class FloatColumnStats extends ColumnStats {
@@ -178,7 +178,7 @@ private[sql] class FloatColumnStats extends ColumnStats {
     }
   }
 
-  def collectedStatistics = Row(lower, upper, nullCount, count, sizeInBytes)
+  override def collectedStatistics: Row = Row(lower, upper, nullCount, count, sizeInBytes)
 }
 
 private[sql] class FixedDecimalColumnStats extends ColumnStats {
@@ -212,7 +212,7 @@ private[sql] class IntColumnStats extends ColumnStats {
     }
   }
 
-  def collectedStatistics = Row(lower, upper, nullCount, count, sizeInBytes)
+  override def collectedStatistics: Row = Row(lower, upper, nullCount, count, sizeInBytes)
 }
 
 private[sql] class StringColumnStats extends ColumnStats {
@@ -229,7 +229,7 @@ private[sql] class StringColumnStats extends ColumnStats {
     }
   }
 
-  def collectedStatistics = Row(lower, upper, nullCount, count, sizeInBytes)
+  override def collectedStatistics: Row = Row(lower, upper, nullCount, count, sizeInBytes)
 }
 
 private[sql] class DateColumnStats extends IntColumnStats
@@ -248,7 +248,7 @@ private[sql] class TimestampColumnStats extends ColumnStats {
     }
   }
 
-  def collectedStatistics = Row(lower, upper, nullCount, count, sizeInBytes)
+  override def collectedStatistics: Row = Row(lower, upper, nullCount, count, sizeInBytes)
 }
 
 private[sql] class BinaryColumnStats extends ColumnStats {
@@ -259,7 +259,7 @@ private[sql] class BinaryColumnStats extends ColumnStats {
     }
   }
 
-  def collectedStatistics = Row(null, null, nullCount, count, sizeInBytes)
+  override def collectedStatistics: Row = Row(null, null, nullCount, count, sizeInBytes)
 }
 
 private[sql] class GenericColumnStats extends ColumnStats {
@@ -270,5 +270,5 @@ private[sql] class GenericColumnStats extends ColumnStats {
     }
   }
 
-  def collectedStatistics = Row(null, null, nullCount, count, sizeInBytes)
+  override def collectedStatistics: Row = Row(null, null, nullCount, count, sizeInBytes)
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/a95043b1/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnType.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnType.scala b/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnType.scala
index 36ea1c7..c47497e 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnType.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnType.scala
@@ -98,7 +98,7 @@ private[sql] sealed abstract class ColumnType[T <: DataType, JvmType](
    */
   def clone(v: JvmType): JvmType = v
 
-  override def toString = getClass.getSimpleName.stripSuffix("$")
+  override def toString: String = getClass.getSimpleName.stripSuffix("$")
 }
 
 private[sql] abstract class NativeColumnType[T <: NativeType](
@@ -114,7 +114,7 @@ private[sql] abstract class NativeColumnType[T <: NativeType](
 }
 
 private[sql] object INT extends NativeColumnType(IntegerType, 0, 4) {
-  def append(v: Int, buffer: ByteBuffer): Unit = {
+  override def append(v: Int, buffer: ByteBuffer): Unit = {
     buffer.putInt(v)
   }
 
@@ -122,7 +122,7 @@ private[sql] object INT extends NativeColumnType(IntegerType, 0, 4) {
     buffer.putInt(row.getInt(ordinal))
   }
 
-  def extract(buffer: ByteBuffer) = {
+  override def extract(buffer: ByteBuffer): Int = {
     buffer.getInt()
   }
 
@@ -134,7 +134,7 @@ private[sql] object INT extends NativeColumnType(IntegerType, 0, 4) {
     row.setInt(ordinal, value)
   }
 
-  override def getField(row: Row, ordinal: Int) = row.getInt(ordinal)
+  override def getField(row: Row, ordinal: Int): Int = row.getInt(ordinal)
 
   override def copyField(from: Row, fromOrdinal: Int, to: MutableRow, toOrdinal: Int): Unit = {
     to.setInt(toOrdinal, from.getInt(fromOrdinal))
@@ -150,7 +150,7 @@ private[sql] object LONG extends NativeColumnType(LongType, 1, 8) {
     buffer.putLong(row.getLong(ordinal))
   }
 
-  override def extract(buffer: ByteBuffer) = {
+  override def extract(buffer: ByteBuffer): Long = {
     buffer.getLong()
   }
 
@@ -162,7 +162,7 @@ private[sql] object LONG extends NativeColumnType(LongType, 1, 8) {
     row.setLong(ordinal, value)
   }
 
-  override def getField(row: Row, ordinal: Int) = row.getLong(ordinal)
+  override def getField(row: Row, ordinal: Int): Long = row.getLong(ordinal)
 
   override def copyField(from: Row, fromOrdinal: Int, to: MutableRow, toOrdinal: Int): Unit = {
     to.setLong(toOrdinal, from.getLong(fromOrdinal))
@@ -178,7 +178,7 @@ private[sql] object FLOAT extends NativeColumnType(FloatType, 2, 4) {
     buffer.putFloat(row.getFloat(ordinal))
   }
 
-  override def extract(buffer: ByteBuffer) = {
+  override def extract(buffer: ByteBuffer): Float = {
     buffer.getFloat()
   }
 
@@ -190,7 +190,7 @@ private[sql] object FLOAT extends NativeColumnType(FloatType, 2, 4) {
     row.setFloat(ordinal, value)
   }
 
-  override def getField(row: Row, ordinal: Int) = row.getFloat(ordinal)
+  override def getField(row: Row, ordinal: Int): Float = row.getFloat(ordinal)
 
   override def copyField(from: Row, fromOrdinal: Int, to: MutableRow, toOrdinal: Int): Unit = {
     to.setFloat(toOrdinal, from.getFloat(fromOrdinal))
@@ -206,7 +206,7 @@ private[sql] object DOUBLE extends NativeColumnType(DoubleType, 3, 8) {
     buffer.putDouble(row.getDouble(ordinal))
   }
 
-  override def extract(buffer: ByteBuffer) = {
+  override def extract(buffer: ByteBuffer): Double = {
     buffer.getDouble()
   }
 
@@ -218,7 +218,7 @@ private[sql] object DOUBLE extends NativeColumnType(DoubleType, 3, 8) {
     row.setDouble(ordinal, value)
   }
 
-  override def getField(row: Row, ordinal: Int) = row.getDouble(ordinal)
+  override def getField(row: Row, ordinal: Int): Double = row.getDouble(ordinal)
 
   override def copyField(from: Row, fromOrdinal: Int, to: MutableRow, toOrdinal: Int): Unit = {
     to.setDouble(toOrdinal, from.getDouble(fromOrdinal))
@@ -234,7 +234,7 @@ private[sql] object BOOLEAN extends NativeColumnType(BooleanType, 4, 1) {
     buffer.put(if (row.getBoolean(ordinal)) 1: Byte else 0: Byte)
   }
 
-  override def extract(buffer: ByteBuffer) = buffer.get() == 1
+  override def extract(buffer: ByteBuffer): Boolean = buffer.get() == 1
 
   override def extract(buffer: ByteBuffer, row: MutableRow, ordinal: Int): Unit = {
     row.setBoolean(ordinal, buffer.get() == 1)
@@ -244,7 +244,7 @@ private[sql] object BOOLEAN extends NativeColumnType(BooleanType, 4, 1) {
     row.setBoolean(ordinal, value)
   }
 
-  override def getField(row: Row, ordinal: Int) = row.getBoolean(ordinal)
+  override def getField(row: Row, ordinal: Int): Boolean = row.getBoolean(ordinal)
 
   override def copyField(from: Row, fromOrdinal: Int, to: MutableRow, toOrdinal: Int): Unit = {
     to.setBoolean(toOrdinal, from.getBoolean(fromOrdinal))
@@ -260,7 +260,7 @@ private[sql] object BYTE extends NativeColumnType(ByteType, 5, 1) {
     buffer.put(row.getByte(ordinal))
   }
 
-  override def extract(buffer: ByteBuffer) = {
+  override def extract(buffer: ByteBuffer): Byte = {
     buffer.get()
   }
 
@@ -272,7 +272,7 @@ private[sql] object BYTE extends NativeColumnType(ByteType, 5, 1) {
     row.setByte(ordinal, value)
   }
 
-  override def getField(row: Row, ordinal: Int) = row.getByte(ordinal)
+  override def getField(row: Row, ordinal: Int): Byte = row.getByte(ordinal)
 
   override def copyField(from: Row, fromOrdinal: Int, to: MutableRow, toOrdinal: Int): Unit = {
     to.setByte(toOrdinal, from.getByte(fromOrdinal))
@@ -288,7 +288,7 @@ private[sql] object SHORT extends NativeColumnType(ShortType, 6, 2) {
     buffer.putShort(row.getShort(ordinal))
   }
 
-  override def extract(buffer: ByteBuffer) = {
+  override def extract(buffer: ByteBuffer): Short = {
     buffer.getShort()
   }
 
@@ -300,7 +300,7 @@ private[sql] object SHORT extends NativeColumnType(ShortType, 6, 2) {
     row.setShort(ordinal, value)
   }
 
-  override def getField(row: Row, ordinal: Int) = row.getShort(ordinal)
+  override def getField(row: Row, ordinal: Int): Short = row.getShort(ordinal)
 
   override def copyField(from: Row, fromOrdinal: Int, to: MutableRow, toOrdinal: Int): Unit = {
     to.setShort(toOrdinal, from.getShort(fromOrdinal))
@@ -317,7 +317,7 @@ private[sql] object STRING extends NativeColumnType(StringType, 7, 8) {
     buffer.putInt(stringBytes.length).put(stringBytes, 0, stringBytes.length)
   }
 
-  override def extract(buffer: ByteBuffer) = {
+  override def extract(buffer: ByteBuffer): String = {
     val length = buffer.getInt()
     val stringBytes = new Array[Byte](length)
     buffer.get(stringBytes, 0, length)
@@ -328,7 +328,7 @@ private[sql] object STRING extends NativeColumnType(StringType, 7, 8) {
     row.setString(ordinal, value)
   }
 
-  override def getField(row: Row, ordinal: Int) = row.getString(ordinal)
+  override def getField(row: Row, ordinal: Int): String = row.getString(ordinal)
 
   override def copyField(from: Row, fromOrdinal: Int, to: MutableRow, toOrdinal: Int): Unit = {
     to.setString(toOrdinal, from.getString(fromOrdinal))
@@ -336,7 +336,7 @@ private[sql] object STRING extends NativeColumnType(StringType, 7, 8) {
 }
 
 private[sql] object DATE extends NativeColumnType(DateType, 8, 4) {
-  override def extract(buffer: ByteBuffer) = {
+  override def extract(buffer: ByteBuffer): Int = {
     buffer.getInt
   }
 
@@ -344,7 +344,7 @@ private[sql] object DATE extends NativeColumnType(DateType, 8, 4) {
     buffer.putInt(v)
   }
 
-  override def getField(row: Row, ordinal: Int) = {
+  override def getField(row: Row, ordinal: Int): Int = {
     row(ordinal).asInstanceOf[Int]
   }
 
@@ -354,7 +354,7 @@ private[sql] object DATE extends NativeColumnType(DateType, 8, 4) {
 }
 
 private[sql] object TIMESTAMP extends NativeColumnType(TimestampType, 9, 12) {
-  override def extract(buffer: ByteBuffer) = {
+  override def extract(buffer: ByteBuffer): Timestamp = {
     val timestamp = new Timestamp(buffer.getLong())
     timestamp.setNanos(buffer.getInt())
     timestamp
@@ -364,7 +364,7 @@ private[sql] object TIMESTAMP extends NativeColumnType(TimestampType, 9, 12) {
     buffer.putLong(v.getTime).putInt(v.getNanos)
   }
 
-  override def getField(row: Row, ordinal: Int) = {
+  override def getField(row: Row, ordinal: Int): Timestamp = {
     row(ordinal).asInstanceOf[Timestamp]
   }
 
@@ -405,7 +405,7 @@ private[sql] sealed abstract class ByteArrayColumnType[T <: DataType](
     defaultSize: Int)
   extends ColumnType[T, Array[Byte]](typeId, defaultSize) {
 
-  override def actualSize(row: Row, ordinal: Int) = {
+  override def actualSize(row: Row, ordinal: Int): Int = {
     getField(row, ordinal).length + 4
   }
 
@@ -413,7 +413,7 @@ private[sql] sealed abstract class ByteArrayColumnType[T <: DataType](
     buffer.putInt(v.length).put(v, 0, v.length)
   }
 
-  override def extract(buffer: ByteBuffer) = {
+  override def extract(buffer: ByteBuffer): Array[Byte] = {
     val length = buffer.getInt()
     val bytes = new Array[Byte](length)
     buffer.get(bytes, 0, length)
@@ -426,7 +426,9 @@ private[sql] object BINARY extends ByteArrayColumnType[BinaryType.type](11, 16)
     row(ordinal) = value
   }
 
-  override def getField(row: Row, ordinal: Int) = row(ordinal).asInstanceOf[Array[Byte]]
+  override def getField(row: Row, ordinal: Int): Array[Byte] = {
+    row(ordinal).asInstanceOf[Array[Byte]]
+  }
 }
 
 // Used to process generic objects (all types other than those listed above). Objects should be
@@ -437,7 +439,9 @@ private[sql] object GENERIC extends ByteArrayColumnType[DataType](12, 16) {
     row(ordinal) = SparkSqlSerializer.deserialize[Any](value)
   }
 
-  override def getField(row: Row, ordinal: Int) = SparkSqlSerializer.serialize(row(ordinal))
+  override def getField(row: Row, ordinal: Int): Array[Byte] = {
+    SparkSqlSerializer.serialize(row(ordinal))
+  }
 }
 
 private[sql] object ColumnType {

http://git-wip-us.apache.org/repos/asf/spark/blob/a95043b1/sql/core/src/main/scala/org/apache/spark/sql/columnar/InMemoryColumnarTableScan.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/columnar/InMemoryColumnarTableScan.scala b/sql/core/src/main/scala/org/apache/spark/sql/columnar/InMemoryColumnarTableScan.scala
index 387faee..6eee0c8 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/columnar/InMemoryColumnarTableScan.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/columnar/InMemoryColumnarTableScan.scala
@@ -19,6 +19,9 @@ package org.apache.spark.sql.columnar
 
 import java.nio.ByteBuffer
 
+import org.apache.spark.Accumulator
+import org.apache.spark.sql.catalyst.expressions
+
 import scala.collection.mutable.ArrayBuffer
 
 import org.apache.spark.rdd.RDD
@@ -77,20 +80,23 @@ private[sql] case class InMemoryRelation(
     _statistics
   }
 
-  override def statistics = if (_statistics == null) {
-    if (batchStats.value.isEmpty) {
-      // Underlying columnar RDD hasn't been materialized, no useful statistics information
-      // available, return the default statistics.
-      Statistics(sizeInBytes = child.sqlContext.conf.defaultSizeInBytes)
+  override def statistics: Statistics = {
+    if (_statistics == null) {
+      if (batchStats.value.isEmpty) {
+        // Underlying columnar RDD hasn't been materialized, no useful statistics information
+        // available, return the default statistics.
+        Statistics(sizeInBytes = child.sqlContext.conf.defaultSizeInBytes)
+      } else {
+        // Underlying columnar RDD has been materialized, required information has also been
+        // collected via the `batchStats` accumulator, compute the final statistics,
+        // and update `_statistics`.
+        _statistics = Statistics(sizeInBytes = computeSizeInBytes)
+        _statistics
+      }
     } else {
-      // Underlying columnar RDD has been materialized, required information has also been collected
-      // via the `batchStats` accumulator, compute the final statistics, and update `_statistics`.
-      _statistics = Statistics(sizeInBytes = computeSizeInBytes)
+      // Pre-computed statistics
       _statistics
     }
-  } else {
-    // Pre-computed statistics
-    _statistics
   }
 
   // If the cached column buffers were not passed in, we calculate them in the constructor.
@@ -99,7 +105,7 @@ private[sql] case class InMemoryRelation(
     buildBuffers()
   }
 
-  def recache() = {
+  def recache(): Unit = {
     _cachedColumnBuffers.unpersist()
     _cachedColumnBuffers = null
     buildBuffers()
@@ -109,7 +115,7 @@ private[sql] case class InMemoryRelation(
     val output = child.output
     val cached = child.execute().mapPartitions { rowIterator =>
       new Iterator[CachedBatch] {
-        def next() = {
+        def next(): CachedBatch = {
           val columnBuilders = output.map { attribute =>
             val columnType = ColumnType(attribute.dataType)
             val initialBufferSize = columnType.defaultSize * batchSize
@@ -144,7 +150,7 @@ private[sql] case class InMemoryRelation(
           CachedBatch(columnBuilders.map(_.build().array()), stats)
         }
 
-        def hasNext = rowIterator.hasNext
+        def hasNext: Boolean = rowIterator.hasNext
       }
     }.persist(storageLevel)
 
@@ -158,9 +164,9 @@ private[sql] case class InMemoryRelation(
       _cachedColumnBuffers, statisticsToBePropagated)
   }
 
-  override def children = Seq.empty
+  override def children: Seq[LogicalPlan] = Seq.empty
 
-  override def newInstance() = {
+  override def newInstance(): this.type = {
     new InMemoryRelation(
       output.map(_.newInstance()),
       useCompression,
@@ -172,7 +178,7 @@ private[sql] case class InMemoryRelation(
       statisticsToBePropagated).asInstanceOf[this.type]
   }
 
-  def cachedColumnBuffers = _cachedColumnBuffers
+  def cachedColumnBuffers: RDD[CachedBatch] = _cachedColumnBuffers
 
   override protected def otherCopyArgs: Seq[AnyRef] =
     Seq(_cachedColumnBuffers, statisticsToBePropagated)
@@ -220,7 +226,7 @@ private[sql] case class InMemoryColumnarTableScan(
     case IsNotNull(a: Attribute) => statsFor(a).count - statsFor(a).nullCount > 0
   }
 
-  val partitionFilters = {
+  val partitionFilters: Seq[Expression] = {
     predicates.flatMap { p =>
       val filter = buildFilter.lift(p)
       val boundFilter =
@@ -239,12 +245,12 @@ private[sql] case class InMemoryColumnarTableScan(
   }
 
   // Accumulators used for testing purposes
-  val readPartitions = sparkContext.accumulator(0)
-  val readBatches = sparkContext.accumulator(0)
+  val readPartitions: Accumulator[Int] = sparkContext.accumulator(0)
+  val readBatches: Accumulator[Int] = sparkContext.accumulator(0)
 
   private val inMemoryPartitionPruningEnabled = sqlContext.conf.inMemoryPartitionPruning
 
-  override def execute() = {
+  override def execute(): RDD[Row] = {
     readPartitions.setValue(0)
     readBatches.setValue(0)
 
@@ -271,7 +277,7 @@ private[sql] case class InMemoryColumnarTableScan(
 
       val nextRow = new SpecificMutableRow(requestedColumnDataTypes)
 
-      def cachedBatchesToRows(cacheBatches: Iterator[CachedBatch]) = {
+      def cachedBatchesToRows(cacheBatches: Iterator[CachedBatch]): Iterator[Row] = {
         val rows = cacheBatches.flatMap { cachedBatch =>
           // Build column accessors
           val columnAccessors = requestedColumnIndices.map { batchColumnIndex =>
@@ -283,7 +289,7 @@ private[sql] case class InMemoryColumnarTableScan(
           // Extract rows via column accessors
           new Iterator[Row] {
             private[this] val rowLen = nextRow.length
-            override def next() = {
+            override def next(): Row = {
               var i = 0
               while (i < rowLen) {
                 columnAccessors(i).extractTo(nextRow, i)
@@ -292,7 +298,7 @@ private[sql] case class InMemoryColumnarTableScan(
               nextRow
             }
 
-            override def hasNext = columnAccessors(0).hasNext
+            override def hasNext: Boolean = columnAccessors(0).hasNext
           }
         }
 
@@ -308,7 +314,7 @@ private[sql] case class InMemoryColumnarTableScan(
         if (inMemoryPartitionPruningEnabled) {
           cachedBatchIterator.filter { cachedBatch =>
             if (!partitionFilter(cachedBatch.stats)) {
-              def statsString = relation.partitionStatistics.schema
+              def statsString: String = relation.partitionStatistics.schema
                 .zip(cachedBatch.stats.toSeq)
                 .map { case (a, s) => s"${a.name}: $s" }
                 .mkString(", ")

http://git-wip-us.apache.org/repos/asf/spark/blob/a95043b1/sql/core/src/main/scala/org/apache/spark/sql/columnar/NullableColumnAccessor.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/columnar/NullableColumnAccessor.scala b/sql/core/src/main/scala/org/apache/spark/sql/columnar/NullableColumnAccessor.scala
index 965782a..4d35650 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/columnar/NullableColumnAccessor.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/columnar/NullableColumnAccessor.scala
@@ -55,5 +55,5 @@ private[sql] trait NullableColumnAccessor extends ColumnAccessor {
     pos += 1
   }
 
-  abstract override def hasNext = seenNulls < nullCount || super.hasNext
+  abstract override def hasNext: Boolean = seenNulls < nullCount || super.hasNext
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/a95043b1/sql/core/src/main/scala/org/apache/spark/sql/columnar/compression/CompressibleColumnAccessor.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/columnar/compression/CompressibleColumnAccessor.scala b/sql/core/src/main/scala/org/apache/spark/sql/columnar/compression/CompressibleColumnAccessor.scala
index 7dff9de..d0b602a 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/columnar/compression/CompressibleColumnAccessor.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/columnar/compression/CompressibleColumnAccessor.scala
@@ -26,12 +26,12 @@ private[sql] trait CompressibleColumnAccessor[T <: NativeType] extends ColumnAcc
 
   private var decoder: Decoder[T] = _
 
-  abstract override protected def initialize() = {
+  abstract override protected def initialize(): Unit = {
     super.initialize()
     decoder = CompressionScheme(underlyingBuffer.getInt()).decoder(buffer, columnType)
   }
 
-  abstract override def hasNext = super.hasNext || decoder.hasNext
+  abstract override def hasNext: Boolean = super.hasNext || decoder.hasNext
 
   override def extractSingle(row: MutableRow, ordinal: Int): Unit = {
     decoder.next(row, ordinal)

http://git-wip-us.apache.org/repos/asf/spark/blob/a95043b1/sql/core/src/main/scala/org/apache/spark/sql/columnar/compression/CompressibleColumnBuilder.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/columnar/compression/CompressibleColumnBuilder.scala b/sql/core/src/main/scala/org/apache/spark/sql/columnar/compression/CompressibleColumnBuilder.scala
index aead768..b9cfc5d 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/columnar/compression/CompressibleColumnBuilder.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/columnar/compression/CompressibleColumnBuilder.scala
@@ -81,7 +81,7 @@ private[sql] trait CompressibleColumnBuilder[T <: NativeType]
     }
   }
 
-  override def build() = {
+  override def build(): ByteBuffer = {
     val nonNullBuffer = buildNonNulls()
     val typeId = nonNullBuffer.getInt()
     val encoder: Encoder[T] = {

http://git-wip-us.apache.org/repos/asf/spark/blob/a95043b1/sql/core/src/main/scala/org/apache/spark/sql/columnar/compression/compressionSchemes.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/columnar/compression/compressionSchemes.scala b/sql/core/src/main/scala/org/apache/spark/sql/columnar/compression/compressionSchemes.scala
index 68a5b1d..8727d71 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/columnar/compression/compressionSchemes.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/columnar/compression/compressionSchemes.scala
@@ -33,22 +33,23 @@ import org.apache.spark.util.Utils
 private[sql] case object PassThrough extends CompressionScheme {
   override val typeId = 0
 
-  override def supports(columnType: ColumnType[_, _]) = true
+  override def supports(columnType: ColumnType[_, _]): Boolean = true
 
-  override def encoder[T <: NativeType](columnType: NativeColumnType[T]) = {
+  override def encoder[T <: NativeType](columnType: NativeColumnType[T]): Encoder[T] = {
     new this.Encoder[T](columnType)
   }
 
-  override def decoder[T <: NativeType](buffer: ByteBuffer, columnType: NativeColumnType[T]) = {
+  override def decoder[T <: NativeType](
+      buffer: ByteBuffer, columnType: NativeColumnType[T]): Decoder[T] = {
     new this.Decoder(buffer, columnType)
   }
 
   class Encoder[T <: NativeType](columnType: NativeColumnType[T]) extends compression.Encoder[T] {
-    override def uncompressedSize = 0
+    override def uncompressedSize: Int = 0
 
-    override def compressedSize = 0
+    override def compressedSize: Int = 0
 
-    override def compress(from: ByteBuffer, to: ByteBuffer) = {
+    override def compress(from: ByteBuffer, to: ByteBuffer): ByteBuffer = {
       // Writes compression type ID and copies raw contents
       to.putInt(PassThrough.typeId).put(from).rewind()
       to
@@ -62,22 +63,23 @@ private[sql] case object PassThrough extends CompressionScheme {
       columnType.extract(buffer, row, ordinal)
     }
 
-    override def hasNext = buffer.hasRemaining
+    override def hasNext: Boolean = buffer.hasRemaining
   }
 }
 
 private[sql] case object RunLengthEncoding extends CompressionScheme {
   override val typeId = 1
 
-  override def encoder[T <: NativeType](columnType: NativeColumnType[T]) = {
+  override def encoder[T <: NativeType](columnType: NativeColumnType[T]): Encoder[T] = {
     new this.Encoder[T](columnType)
   }
 
-  override def decoder[T <: NativeType](buffer: ByteBuffer, columnType: NativeColumnType[T]) = {
+  override def decoder[T <: NativeType](
+      buffer: ByteBuffer, columnType: NativeColumnType[T]): Decoder[T] = {
     new this.Decoder(buffer, columnType)
   }
 
-  override def supports(columnType: ColumnType[_, _]) = columnType match {
+  override def supports(columnType: ColumnType[_, _]): Boolean = columnType match {
     case INT | LONG | SHORT | BYTE | STRING | BOOLEAN => true
     case _ => false
   }
@@ -90,9 +92,9 @@ private[sql] case object RunLengthEncoding extends CompressionScheme {
     private val lastValue = new SpecificMutableRow(Seq(columnType.dataType))
     private var lastRun = 0
 
-    override def uncompressedSize = _uncompressedSize
+    override def uncompressedSize: Int = _uncompressedSize
 
-    override def compressedSize = _compressedSize
+    override def compressedSize: Int = _compressedSize
 
     override def gatherCompressibilityStats(row: Row, ordinal: Int): Unit = {
       val value = columnType.getField(row, ordinal)
@@ -114,7 +116,7 @@ private[sql] case object RunLengthEncoding extends CompressionScheme {
       }
     }
 
-    override def compress(from: ByteBuffer, to: ByteBuffer) = {
+    override def compress(from: ByteBuffer, to: ByteBuffer): ByteBuffer = {
       to.putInt(RunLengthEncoding.typeId)
 
       if (from.hasRemaining) {
@@ -169,7 +171,7 @@ private[sql] case object RunLengthEncoding extends CompressionScheme {
       columnType.setField(row, ordinal, currentValue)
     }
 
-    override def hasNext = valueCount < run || buffer.hasRemaining
+    override def hasNext: Boolean = valueCount < run || buffer.hasRemaining
   }
 }
 
@@ -179,15 +181,16 @@ private[sql] case object DictionaryEncoding extends CompressionScheme {
   // 32K unique values allowed
   val MAX_DICT_SIZE = Short.MaxValue
 
-  override def decoder[T <: NativeType](buffer: ByteBuffer, columnType: NativeColumnType[T]) = {
+  override def decoder[T <: NativeType](buffer: ByteBuffer, columnType: NativeColumnType[T])
+    : Decoder[T] = {
     new this.Decoder(buffer, columnType)
   }
 
-  override def encoder[T <: NativeType](columnType: NativeColumnType[T]) = {
+  override def encoder[T <: NativeType](columnType: NativeColumnType[T]): Encoder[T] = {
     new this.Encoder[T](columnType)
   }
 
-  override def supports(columnType: ColumnType[_, _]) = columnType match {
+  override def supports(columnType: ColumnType[_, _]): Boolean = columnType match {
     case INT | LONG | STRING => true
     case _ => false
   }
@@ -237,7 +240,7 @@ private[sql] case object DictionaryEncoding extends CompressionScheme {
       }
     }
 
-    override def compress(from: ByteBuffer, to: ByteBuffer) = {
+    override def compress(from: ByteBuffer, to: ByteBuffer): ByteBuffer = {
       if (overflow) {
         throw new IllegalStateException(
           "Dictionary encoding should not be used because of dictionary overflow.")
@@ -260,9 +263,9 @@ private[sql] case object DictionaryEncoding extends CompressionScheme {
       to
     }
 
-    override def uncompressedSize = _uncompressedSize
+    override def uncompressedSize: Int = _uncompressedSize
 
-    override def compressedSize = if (overflow) Int.MaxValue else dictionarySize + count * 2
+    override def compressedSize: Int = if (overflow) Int.MaxValue else dictionarySize + count * 2
   }
 
   class Decoder[T <: NativeType](buffer: ByteBuffer, columnType: NativeColumnType[T])
@@ -284,7 +287,7 @@ private[sql] case object DictionaryEncoding extends CompressionScheme {
       columnType.setField(row, ordinal, dictionary(buffer.getShort()))
     }
 
-    override def hasNext = buffer.hasRemaining
+    override def hasNext: Boolean = buffer.hasRemaining
   }
 }
 
@@ -293,15 +296,16 @@ private[sql] case object BooleanBitSet extends CompressionScheme {
 
   val BITS_PER_LONG = 64
 
-  override def decoder[T <: NativeType](buffer: ByteBuffer, columnType: NativeColumnType[T]) = {
+  override def decoder[T <: NativeType](buffer: ByteBuffer, columnType: NativeColumnType[T])
+    : compression.Decoder[T] = {
     new this.Decoder(buffer).asInstanceOf[compression.Decoder[T]]
   }
 
-  override def encoder[T <: NativeType](columnType: NativeColumnType[T]) = {
+  override def encoder[T <: NativeType](columnType: NativeColumnType[T]): compression.Encoder[T] = {
     (new this.Encoder).asInstanceOf[compression.Encoder[T]]
   }
 
-  override def supports(columnType: ColumnType[_, _]) = columnType == BOOLEAN
+  override def supports(columnType: ColumnType[_, _]): Boolean = columnType == BOOLEAN
 
   class Encoder extends compression.Encoder[BooleanType.type] {
     private var _uncompressedSize = 0
@@ -310,7 +314,7 @@ private[sql] case object BooleanBitSet extends CompressionScheme {
       _uncompressedSize += BOOLEAN.defaultSize
     }
 
-    override def compress(from: ByteBuffer, to: ByteBuffer) = {
+    override def compress(from: ByteBuffer, to: ByteBuffer): ByteBuffer = {
       to.putInt(BooleanBitSet.typeId)
         // Total element count (1 byte per Boolean value)
         .putInt(from.remaining)
@@ -347,9 +351,9 @@ private[sql] case object BooleanBitSet extends CompressionScheme {
       to
     }
 
-    override def uncompressedSize = _uncompressedSize
+    override def uncompressedSize: Int = _uncompressedSize
 
-    override def compressedSize = {
+    override def compressedSize: Int = {
       val extra = if (_uncompressedSize % BITS_PER_LONG == 0) 0 else 1
       (_uncompressedSize / BITS_PER_LONG + extra) * 8 + 4
     }
@@ -380,22 +384,23 @@ private[sql] case object BooleanBitSet extends CompressionScheme {
 private[sql] case object IntDelta extends CompressionScheme {
   override def typeId: Int = 4
 
-  override def decoder[T <: NativeType](buffer: ByteBuffer, columnType: NativeColumnType[T]) = {
+  override def decoder[T <: NativeType](buffer: ByteBuffer, columnType: NativeColumnType[T])
+    : compression.Decoder[T] = {
     new Decoder(buffer, INT).asInstanceOf[compression.Decoder[T]]
   }
 
-  override def encoder[T <: NativeType](columnType: NativeColumnType[T]) = {
+  override def encoder[T <: NativeType](columnType: NativeColumnType[T]): compression.Encoder[T] = {
     (new Encoder).asInstanceOf[compression.Encoder[T]]
   }
 
-  override def supports(columnType: ColumnType[_, _]) = columnType == INT
+  override def supports(columnType: ColumnType[_, _]): Boolean = columnType == INT
 
   class Encoder extends compression.Encoder[IntegerType.type] {
     protected var _compressedSize: Int = 0
     protected var _uncompressedSize: Int = 0
 
-    override def compressedSize = _compressedSize
-    override def uncompressedSize = _uncompressedSize
+    override def compressedSize: Int = _compressedSize
+    override def uncompressedSize: Int = _uncompressedSize
 
     private var prevValue: Int = _
 
@@ -459,22 +464,23 @@ private[sql] case object IntDelta extends CompressionScheme {
 private[sql] case object LongDelta extends CompressionScheme {
   override def typeId: Int = 5
 
-  override def decoder[T <: NativeType](buffer: ByteBuffer, columnType: NativeColumnType[T]) = {
+  override def decoder[T <: NativeType](buffer: ByteBuffer, columnType: NativeColumnType[T])
+    : compression.Decoder[T] = {
     new Decoder(buffer, LONG).asInstanceOf[compression.Decoder[T]]
   }
 
-  override def encoder[T <: NativeType](columnType: NativeColumnType[T]) = {
+  override def encoder[T <: NativeType](columnType: NativeColumnType[T]): compression.Encoder[T] = {
     (new Encoder).asInstanceOf[compression.Encoder[T]]
   }
 
-  override def supports(columnType: ColumnType[_, _]) = columnType == LONG
+  override def supports(columnType: ColumnType[_, _]): Boolean = columnType == LONG
 
   class Encoder extends compression.Encoder[LongType.type] {
     protected var _compressedSize: Int = 0
     protected var _uncompressedSize: Int = 0
 
-    override def compressedSize = _compressedSize
-    override def uncompressedSize = _uncompressedSize
+    override def compressedSize: Int = _compressedSize
+    override def uncompressedSize: Int = _uncompressedSize
 
     private var prevValue: Long = _
 

http://git-wip-us.apache.org/repos/asf/spark/blob/a95043b1/sql/core/src/main/scala/org/apache/spark/sql/execution/Aggregate.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/Aggregate.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/Aggregate.scala
index ad44a01..18b1ba4 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/Aggregate.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/Aggregate.scala
@@ -21,6 +21,7 @@ import java.util.HashMap
 
 import org.apache.spark.annotation.DeveloperApi
 import org.apache.spark.SparkContext
+import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.catalyst.errors._
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.plans.physical._
@@ -45,7 +46,7 @@ case class Aggregate(
     child: SparkPlan)
   extends UnaryNode {
 
-  override def requiredChildDistribution =
+  override def requiredChildDistribution: List[Distribution] = {
     if (partial) {
       UnspecifiedDistribution :: Nil
     } else {
@@ -55,8 +56,9 @@ case class Aggregate(
         ClusteredDistribution(groupingExpressions) :: Nil
       }
     }
+  }
 
-  override def output = aggregateExpressions.map(_.toAttribute)
+  override def output: Seq[Attribute] = aggregateExpressions.map(_.toAttribute)
 
   /**
    * An aggregate that needs to be computed for each row in a group.
@@ -119,7 +121,7 @@ case class Aggregate(
     }
   }
 
-  override def execute() = attachTree(this, "execute") {
+  override def execute(): RDD[Row] = attachTree(this, "execute") {
     if (groupingExpressions.isEmpty) {
       child.execute().mapPartitions { iter =>
         val buffer = newAggregateBuffer()

http://git-wip-us.apache.org/repos/asf/spark/blob/a95043b1/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala
index 7c0b72a..437408d 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala
@@ -19,11 +19,12 @@ package org.apache.spark.sql.execution
 
 import org.apache.spark.annotation.DeveloperApi
 import org.apache.spark.shuffle.sort.SortShuffleManager
+import org.apache.spark.sql.catalyst.expressions
 import org.apache.spark.{SparkEnv, HashPartitioner, RangePartitioner, SparkConf}
-import org.apache.spark.rdd.ShuffledRDD
+import org.apache.spark.rdd.{RDD, ShuffledRDD}
 import org.apache.spark.sql.{SQLContext, Row}
 import org.apache.spark.sql.catalyst.errors.attachTree
-import org.apache.spark.sql.catalyst.expressions.RowOrdering
+import org.apache.spark.sql.catalyst.expressions.{Attribute, RowOrdering}
 import org.apache.spark.sql.catalyst.plans.physical._
 import org.apache.spark.sql.catalyst.rules.Rule
 import org.apache.spark.util.MutablePair
@@ -34,9 +35,9 @@ import org.apache.spark.util.MutablePair
 @DeveloperApi
 case class Exchange(newPartitioning: Partitioning, child: SparkPlan) extends UnaryNode {
 
-  override def outputPartitioning = newPartitioning
+  override def outputPartitioning: Partitioning = newPartitioning
 
-  override def output = child.output
+  override def output: Seq[Attribute] = child.output
 
   /** We must copy rows when sort based shuffle is on */
   protected def sortBasedShuffleOn = SparkEnv.get.shuffleManager.isInstanceOf[SortShuffleManager]
@@ -44,7 +45,7 @@ case class Exchange(newPartitioning: Partitioning, child: SparkPlan) extends Una
   private val bypassMergeThreshold =
     child.sqlContext.sparkContext.conf.getInt("spark.shuffle.sort.bypassMergeThreshold", 200)
 
-  override def execute() = attachTree(this , "execute") {
+  override def execute(): RDD[Row] = attachTree(this , "execute") {
     newPartitioning match {
       case HashPartitioning(expressions, numPartitions) =>
         // TODO: Eliminate redundant expressions in grouping key and value.
@@ -123,13 +124,13 @@ case class Exchange(newPartitioning: Partitioning, child: SparkPlan) extends Una
  */
 private[sql] case class AddExchange(sqlContext: SQLContext) extends Rule[SparkPlan] {
   // TODO: Determine the number of partitions.
-  def numPartitions = sqlContext.conf.numShufflePartitions
+  def numPartitions: Int = sqlContext.conf.numShufflePartitions
 
   def apply(plan: SparkPlan): SparkPlan = plan.transformUp {
     case operator: SparkPlan =>
       // Check if every child's outputPartitioning satisfies the corresponding
       // required data distribution.
-      def meetsRequirements =
+      def meetsRequirements: Boolean =
         !operator.requiredChildDistribution.zip(operator.children).map {
           case (required, child) =>
             val valid = child.outputPartitioning.satisfies(required)
@@ -147,7 +148,7 @@ private[sql] case class AddExchange(sqlContext: SQLContext) extends Rule[SparkPl
       // datasets are both clustered by "a", but these two outputPartitionings are not
       // compatible.
       // TODO: ASSUMES TRANSITIVITY?
-      def compatible =
+      def compatible: Boolean =
         !operator.children
           .map(_.outputPartitioning)
           .sliding(2)
@@ -158,7 +159,7 @@ private[sql] case class AddExchange(sqlContext: SQLContext) extends Rule[SparkPl
 
       // Check if the partitioning we want to ensure is the same as the child's output
       // partitioning. If so, we do not need to add the Exchange operator.
-      def addExchangeIfNecessary(partitioning: Partitioning, child: SparkPlan) =
+      def addExchangeIfNecessary(partitioning: Partitioning, child: SparkPlan): SparkPlan =
         if (child.outputPartitioning != partitioning) Exchange(partitioning, child) else child
 
       if (meetsRequirements && compatible) {

http://git-wip-us.apache.org/repos/asf/spark/blob/a95043b1/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala
index 248dc15..d895572 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala
@@ -26,6 +26,8 @@ import org.apache.spark.sql.catalyst.expressions.{Attribute, GenericMutableRow}
 import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Statistics}
 import org.apache.spark.sql.types.StructType
 
+import scala.collection.immutable
+
 /**
  * :: DeveloperApi ::
  */
@@ -58,17 +60,17 @@ object RDDConversions {
 case class LogicalRDD(output: Seq[Attribute], rdd: RDD[Row])(sqlContext: SQLContext)
   extends LogicalPlan with MultiInstanceRelation {
 
-  override def children = Nil
+  override def children: Seq[LogicalPlan] = Nil
 
-  override def newInstance() =
+  override def newInstance(): LogicalRDD.this.type =
     LogicalRDD(output.map(_.newInstance()), rdd)(sqlContext).asInstanceOf[this.type]
 
-  override def sameResult(plan: LogicalPlan) = plan match {
+  override def sameResult(plan: LogicalPlan): Boolean = plan match {
     case LogicalRDD(_, otherRDD) => rdd.id == otherRDD.id
     case _ => false
   }
 
-  @transient override lazy val statistics = Statistics(
+  @transient override lazy val statistics: Statistics = Statistics(
     // TODO: Instead of returning a default value here, find a way to return a meaningful size
     // estimate for RDDs. See PR 1238 for more discussions.
     sizeInBytes = BigInt(sqlContext.conf.defaultSizeInBytes)
@@ -77,24 +79,24 @@ case class LogicalRDD(output: Seq[Attribute], rdd: RDD[Row])(sqlContext: SQLCont
 
 /** Physical plan node for scanning data from an RDD. */
 case class PhysicalRDD(output: Seq[Attribute], rdd: RDD[Row]) extends LeafNode {
-  override def execute() = rdd
+  override def execute(): RDD[Row] = rdd
 }
 
 /** Logical plan node for scanning data from a local collection. */
 case class LogicalLocalTable(output: Seq[Attribute], rows: Seq[Row])(sqlContext: SQLContext)
    extends LogicalPlan with MultiInstanceRelation {
 
-  override def children = Nil
+  override def children: Seq[LogicalPlan] = Nil
 
-  override def newInstance() =
+  override def newInstance(): this.type =
     LogicalLocalTable(output.map(_.newInstance()), rows)(sqlContext).asInstanceOf[this.type]
 
-  override def sameResult(plan: LogicalPlan) = plan match {
+  override def sameResult(plan: LogicalPlan): Boolean = plan match {
     case LogicalRDD(_, otherRDD) => rows == rows
     case _ => false
   }
 
-  @transient override lazy val statistics = Statistics(
+  @transient override lazy val statistics: Statistics = Statistics(
     // TODO: Improve the statistics estimation.
     // This is made small enough so it can be broadcasted.
     sizeInBytes = sqlContext.conf.autoBroadcastJoinThreshold - 1

http://git-wip-us.apache.org/repos/asf/spark/blob/a95043b1/sql/core/src/main/scala/org/apache/spark/sql/execution/Expand.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/Expand.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/Expand.scala
index 9517242..5758494 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/Expand.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/Expand.scala
@@ -18,6 +18,7 @@
 package org.apache.spark.sql.execution
 
 import org.apache.spark.annotation.DeveloperApi
+import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.SQLContext
 import org.apache.spark.sql.catalyst.errors._
 import org.apache.spark.sql.catalyst.expressions._
@@ -42,7 +43,7 @@ case class Expand(
   // as UNKNOWN partitioning
   override def outputPartitioning: Partitioning = UnknownPartitioning(0)
 
-  override def execute() = attachTree(this, "execute") {
+  override def execute(): RDD[Row] = attachTree(this, "execute") {
     child.execute().mapPartitions { iter =>
       // TODO Move out projection objects creation and transfer to
       // workers via closure. However we can't assume the Projection
@@ -55,7 +56,7 @@ case class Expand(
         private[this] var idx = -1  // -1 means the initial state
         private[this] var input: Row = _
 
-        override final def hasNext = (-1 < idx && idx < groups.length) || iter.hasNext
+        override final def hasNext: Boolean = (-1 < idx && idx < groups.length) || iter.hasNext
 
         override final def next(): Row = {
           if (idx <= 0) {

http://git-wip-us.apache.org/repos/asf/spark/blob/a95043b1/sql/core/src/main/scala/org/apache/spark/sql/execution/Generate.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/Generate.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/Generate.scala
index 38877c2..1227104 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/Generate.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/Generate.scala
@@ -18,6 +18,7 @@
 package org.apache.spark.sql.execution
 
 import org.apache.spark.annotation.DeveloperApi
+import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.catalyst.expressions._
 
 /**
@@ -54,7 +55,7 @@ case class Generate(
 
   val boundGenerator = BindReferences.bindReference(generator, child.output)
 
-  override def execute() = {
+  override def execute(): RDD[Row] = {
     if (join) {
       child.execute().mapPartitions { iter =>
         val nullValues = Seq.fill(generator.output.size)(Literal(null))

http://git-wip-us.apache.org/repos/asf/spark/blob/a95043b1/sql/core/src/main/scala/org/apache/spark/sql/execution/GeneratedAggregate.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/GeneratedAggregate.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/GeneratedAggregate.scala
index 4abe26f..89682d2 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/GeneratedAggregate.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/GeneratedAggregate.scala
@@ -18,6 +18,7 @@
 package org.apache.spark.sql.execution
 
 import org.apache.spark.annotation.DeveloperApi
+import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.catalyst.trees._
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.plans.physical._
@@ -49,7 +50,7 @@ case class GeneratedAggregate(
     child: SparkPlan)
   extends UnaryNode {
 
-  override def requiredChildDistribution =
+  override def requiredChildDistribution: Seq[Distribution] =
     if (partial) {
       UnspecifiedDistribution :: Nil
     } else {
@@ -60,9 +61,9 @@ case class GeneratedAggregate(
       }
     }
 
-  override def output = aggregateExpressions.map(_.toAttribute)
+  override def output: Seq[Attribute] = aggregateExpressions.map(_.toAttribute)
 
-  override def execute() = {
+  override def execute(): RDD[Row] = {
     val aggregatesToCompute = aggregateExpressions.flatMap { a =>
       a.collect { case agg: AggregateExpression => agg}
     }
@@ -271,9 +272,9 @@ case class GeneratedAggregate(
           private[this] val resultIterator = buffers.entrySet.iterator()
           private[this] val resultProjection = resultProjectionBuilder()
 
-          def hasNext = resultIterator.hasNext
+          def hasNext: Boolean = resultIterator.hasNext
 
-          def next() = {
+          def next(): Row = {
             val currentGroup = resultIterator.next()
             resultProjection(joinedRow(currentGroup.getKey, currentGroup.getValue))
           }

http://git-wip-us.apache.org/repos/asf/spark/blob/a95043b1/sql/core/src/main/scala/org/apache/spark/sql/execution/LocalTableScan.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/LocalTableScan.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/LocalTableScan.scala
index d3a18b3..5bd699a 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/LocalTableScan.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/LocalTableScan.scala
@@ -17,6 +17,7 @@
 
 package org.apache.spark.sql.execution
 
+import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.Row
 import org.apache.spark.sql.catalyst.ScalaReflection
 import org.apache.spark.sql.catalyst.expressions.Attribute
@@ -29,11 +30,11 @@ case class LocalTableScan(output: Seq[Attribute], rows: Seq[Row]) extends LeafNo
 
   private lazy val rdd = sqlContext.sparkContext.parallelize(rows)
 
-  override def execute() = rdd
+  override def execute(): RDD[Row] = rdd
 
-  override def executeCollect() =
+  override def executeCollect(): Array[Row] =
     rows.map(ScalaReflection.convertRowToScala(_, schema)).toArray
 
-  override def executeTake(limit: Int) =
+  override def executeTake(limit: Int): Array[Row] =
     rows.map(ScalaReflection.convertRowToScala(_, schema)).take(limit).toArray
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/a95043b1/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala
index 052766c..d239637 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala
@@ -67,6 +67,7 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ
   // TODO: Move to `DistributedPlan`
   /** Specifies how data is partitioned across different nodes in the cluster. */
   def outputPartitioning: Partitioning = UnknownPartitioning(0) // TODO: WRONG WIDTH!
+
   /** Specifies any partition requirements on the input data for this operator. */
   def requiredChildDistribution: Seq[Distribution] =
     Seq.fill(children.size)(UnspecifiedDistribution)

http://git-wip-us.apache.org/repos/asf/spark/blob/a95043b1/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlSerializer.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlSerializer.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlSerializer.scala
index 30564e1..c4534fd 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlSerializer.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlSerializer.scala
@@ -74,7 +74,7 @@ private[execution] class KryoResourcePool(size: Int)
     new KryoSerializer(sparkConf)
   }
 
-  def newInstance() = ser.newInstance()
+  def newInstance(): SerializerInstance = ser.newInstance()
 }
 
 private[sql] object SparkSqlSerializer {

http://git-wip-us.apache.org/repos/asf/spark/blob/a95043b1/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
index 5281c75..2b58115 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
@@ -154,7 +154,7 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
       case _ => Nil
     }
 
-    def canBeCodeGened(aggs: Seq[AggregateExpression]) = !aggs.exists {
+    def canBeCodeGened(aggs: Seq[AggregateExpression]): Boolean = !aggs.exists {
       case _: Sum | _: Count | _: Max | _: CombineSetsAndCount => false
       // The generated set implementation is pretty limited ATM.
       case CollectHashSet(exprs) if exprs.size == 1  &&
@@ -162,7 +162,7 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
       case _ => true
     }
 
-    def allAggregates(exprs: Seq[Expression]) =
+    def allAggregates(exprs: Seq[Expression]): Seq[AggregateExpression] =
       exprs.flatMap(_.collect { case a: AggregateExpression => a })
   }
 
@@ -257,7 +257,7 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
 
   // Can we automate these 'pass through' operations?
   object BasicOperators extends Strategy {
-    def numPartitions = self.numPartitions
+    def numPartitions: Int = self.numPartitions
 
     def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
       case r: RunnableCommand => ExecutedCommand(r) :: Nil

http://git-wip-us.apache.org/repos/asf/spark/blob/a95043b1/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala
index 7102685..20c9bc3 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala
@@ -24,7 +24,7 @@ import org.apache.spark.shuffle.sort.SortShuffleManager
 import org.apache.spark.sql.catalyst.ScalaReflection
 import org.apache.spark.sql.catalyst.errors._
 import org.apache.spark.sql.catalyst.expressions._
-import org.apache.spark.sql.catalyst.plans.physical.{ClusteredDistribution, OrderedDistribution, SinglePartition, UnspecifiedDistribution}
+import org.apache.spark.sql.catalyst.plans.physical._
 import org.apache.spark.util.MutablePair
 import org.apache.spark.util.collection.ExternalSorter
 
@@ -33,11 +33,11 @@ import org.apache.spark.util.collection.ExternalSorter
  */
 @DeveloperApi
 case class Project(projectList: Seq[NamedExpression], child: SparkPlan) extends UnaryNode {
-  override def output = projectList.map(_.toAttribute)
+  override def output: Seq[Attribute] = projectList.map(_.toAttribute)
 
   @transient lazy val buildProjection = newMutableProjection(projectList, child.output)
 
-  override def execute() = child.execute().mapPartitions { iter =>
+  override def execute(): RDD[Row] = child.execute().mapPartitions { iter =>
     val resuableProjection = buildProjection()
     iter.map(resuableProjection)
   }
@@ -48,11 +48,11 @@ case class Project(projectList: Seq[NamedExpression], child: SparkPlan) extends
  */
 @DeveloperApi
 case class Filter(condition: Expression, child: SparkPlan) extends UnaryNode {
-  override def output = child.output
+  override def output: Seq[Attribute] = child.output
 
-  @transient lazy val conditionEvaluator = newPredicate(condition, child.output)
+  @transient lazy val conditionEvaluator: (Row) => Boolean = newPredicate(condition, child.output)
 
-  override def execute() = child.execute().mapPartitions { iter =>
+  override def execute(): RDD[Row] = child.execute().mapPartitions { iter =>
     iter.filter(conditionEvaluator)
   }
 }
@@ -64,10 +64,12 @@ case class Filter(condition: Expression, child: SparkPlan) extends UnaryNode {
 case class Sample(fraction: Double, withReplacement: Boolean, seed: Long, child: SparkPlan)
   extends UnaryNode
 {
-  override def output = child.output
+  override def output: Seq[Attribute] = child.output
 
   // TODO: How to pick seed?
-  override def execute() = child.execute().map(_.copy()).sample(withReplacement, fraction, seed)
+  override def execute(): RDD[Row] = {
+    child.execute().map(_.copy()).sample(withReplacement, fraction, seed)
+  }
 }
 
 /**
@@ -76,8 +78,8 @@ case class Sample(fraction: Double, withReplacement: Boolean, seed: Long, child:
 @DeveloperApi
 case class Union(children: Seq[SparkPlan]) extends SparkPlan {
   // TODO: attributes output by union should be distinct for nullability purposes
-  override def output = children.head.output
-  override def execute() = sparkContext.union(children.map(_.execute()))
+  override def output: Seq[Attribute] = children.head.output
+  override def execute(): RDD[Row] = sparkContext.union(children.map(_.execute()))
 }
 
 /**
@@ -97,12 +99,12 @@ case class Limit(limit: Int, child: SparkPlan)
   /** We must copy rows when sort based shuffle is on */
   private def sortBasedShuffleOn = SparkEnv.get.shuffleManager.isInstanceOf[SortShuffleManager]
 
-  override def output = child.output
-  override def outputPartitioning = SinglePartition
+  override def output: Seq[Attribute] = child.output
+  override def outputPartitioning: Partitioning = SinglePartition
 
   override def executeCollect(): Array[Row] = child.executeTake(limit)
 
-  override def execute() = {
+  override def execute(): RDD[Row] = {
     val rdd: RDD[_ <: Product2[Boolean, Row]] = if (sortBasedShuffleOn) {
       child.execute().mapPartitions { iter =>
         iter.take(limit).map(row => (false, row.copy()))
@@ -129,20 +131,21 @@ case class Limit(limit: Int, child: SparkPlan)
 @DeveloperApi
 case class TakeOrdered(limit: Int, sortOrder: Seq[SortOrder], child: SparkPlan) extends UnaryNode {
 
-  override def output = child.output
-  override def outputPartitioning = SinglePartition
+  override def output: Seq[Attribute] = child.output
+
+  override def outputPartitioning: Partitioning = SinglePartition
 
-  val ord = new RowOrdering(sortOrder, child.output)
+  private val ord: RowOrdering = new RowOrdering(sortOrder, child.output)
 
-  private def collectData() = child.execute().map(_.copy()).takeOrdered(limit)(ord)
+  private def collectData(): Array[Row] = child.execute().map(_.copy()).takeOrdered(limit)(ord)
 
   // TODO: Is this copying for no reason?
-  override def executeCollect() =
+  override def executeCollect(): Array[Row] =
     collectData().map(ScalaReflection.convertRowToScala(_, this.schema))
 
   // TODO: Terminal split should be implemented differently from non-terminal split.
   // TODO: Pick num splits based on |limit|.
-  override def execute() = sparkContext.makeRDD(collectData(), 1)
+  override def execute(): RDD[Row] = sparkContext.makeRDD(collectData(), 1)
 }
 
 /**
@@ -157,17 +160,17 @@ case class Sort(
     global: Boolean,
     child: SparkPlan)
   extends UnaryNode {
-  override def requiredChildDistribution =
+  override def requiredChildDistribution: Seq[Distribution] =
     if (global) OrderedDistribution(sortOrder) :: Nil else UnspecifiedDistribution :: Nil
 
-  override def execute() = attachTree(this, "sort") {
+  override def execute(): RDD[Row] = attachTree(this, "sort") {
     child.execute().mapPartitions( { iterator =>
       val ordering = newOrdering(sortOrder, child.output)
       iterator.map(_.copy()).toArray.sorted(ordering).iterator
     }, preservesPartitioning = true)
   }
 
-  override def output = child.output
+  override def output: Seq[Attribute] = child.output
 }
 
 /**
@@ -182,10 +185,11 @@ case class ExternalSort(
     global: Boolean,
     child: SparkPlan)
   extends UnaryNode {
-  override def requiredChildDistribution =
+
+  override def requiredChildDistribution: Seq[Distribution] =
     if (global) OrderedDistribution(sortOrder) :: Nil else UnspecifiedDistribution :: Nil
 
-  override def execute() = attachTree(this, "sort") {
+  override def execute(): RDD[Row] = attachTree(this, "sort") {
     child.execute().mapPartitions( { iterator =>
       val ordering = newOrdering(sortOrder, child.output)
       val sorter = new ExternalSorter[Row, Null, Row](ordering = Some(ordering))
@@ -194,7 +198,7 @@ case class ExternalSort(
     }, preservesPartitioning = true)
   }
 
-  override def output = child.output
+  override def output: Seq[Attribute] = child.output
 }
 
 /**
@@ -206,12 +210,12 @@ case class ExternalSort(
  */
 @DeveloperApi
 case class Distinct(partial: Boolean, child: SparkPlan) extends UnaryNode {
-  override def output = child.output
+  override def output: Seq[Attribute] = child.output
 
-  override def requiredChildDistribution =
+  override def requiredChildDistribution: Seq[Distribution] =
     if (partial) UnspecifiedDistribution :: Nil else ClusteredDistribution(child.output) :: Nil
 
-  override def execute() = {
+  override def execute(): RDD[Row] = {
     child.execute().mapPartitions { iter =>
       val hashSet = new scala.collection.mutable.HashSet[Row]()
 
@@ -236,9 +240,9 @@ case class Distinct(partial: Boolean, child: SparkPlan) extends UnaryNode {
  */
 @DeveloperApi
 case class Except(left: SparkPlan, right: SparkPlan) extends BinaryNode {
-  override def output = left.output
+  override def output: Seq[Attribute] = left.output
 
-  override def execute() = {
+  override def execute(): RDD[Row] = {
     left.execute().map(_.copy()).subtract(right.execute().map(_.copy()))
   }
 }
@@ -250,9 +254,9 @@ case class Except(left: SparkPlan, right: SparkPlan) extends BinaryNode {
  */
 @DeveloperApi
 case class Intersect(left: SparkPlan, right: SparkPlan) extends BinaryNode {
-  override def output = children.head.output
+  override def output: Seq[Attribute] = children.head.output
 
-  override def execute() = {
+  override def execute(): RDD[Row] = {
     left.execute().map(_.copy()).intersection(right.execute().map(_.copy()))
   }
 }
@@ -265,6 +269,7 @@ case class Intersect(left: SparkPlan, right: SparkPlan) extends BinaryNode {
  */
 @DeveloperApi
 case class OutputFaker(output: Seq[Attribute], child: SparkPlan) extends SparkPlan {
-  def children = child :: Nil
-  def execute() = child.execute()
+  def children: Seq[SparkPlan] = child :: Nil
+
+  def execute(): RDD[Row] = child.execute()
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/a95043b1/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala
index a112321..fad7a28 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala
@@ -26,7 +26,6 @@ import org.apache.spark.sql.catalyst.errors.TreeNodeException
 import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Row, Attribute}
 import org.apache.spark.sql.catalyst.plans.logical
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
-import scala.collection.mutable.ArrayBuffer
 
 /**
  * A logical command that is executed for its side-effects.  `RunnableCommand`s are
@@ -54,9 +53,9 @@ case class ExecutedCommand(cmd: RunnableCommand) extends SparkPlan {
    */
   protected[sql] lazy val sideEffectResult: Seq[Row] = cmd.run(sqlContext)
 
-  override def output = cmd.output
+  override def output: Seq[Attribute] = cmd.output
 
-  override def children = Nil
+  override def children: Seq[SparkPlan] = Nil
 
   override def executeCollect(): Array[Row] = sideEffectResult.toArray
 
@@ -71,9 +70,10 @@ case class ExecutedCommand(cmd: RunnableCommand) extends SparkPlan {
 @DeveloperApi
 case class SetCommand(
     kv: Option[(String, Option[String])],
-    override val output: Seq[Attribute]) extends RunnableCommand with Logging {
+    override val output: Seq[Attribute])
+  extends RunnableCommand with Logging {
 
-  override def run(sqlContext: SQLContext) = kv match {
+  override def run(sqlContext: SQLContext): Seq[Row] = kv match {
     // Configures the deprecated "mapred.reduce.tasks" property.
     case Some((SQLConf.Deprecated.MAPRED_REDUCE_TASKS, Some(value))) =>
       logWarning(
@@ -119,10 +119,11 @@ case class ExplainCommand(
     logicalPlan: LogicalPlan,
     override val output: Seq[Attribute] =
       Seq(AttributeReference("plan", StringType, nullable = false)()),
-    extended: Boolean = false) extends RunnableCommand {
+    extended: Boolean = false)
+  extends RunnableCommand {
 
   // Run through the optimizer to generate the physical plan.
-  override def run(sqlContext: SQLContext) = try {
+  override def run(sqlContext: SQLContext): Seq[Row] = try {
     // TODO in Hive, the "extended" ExplainCommand prints the AST as well, and detailed properties.
     val queryExecution = sqlContext.executePlan(logicalPlan)
     val outputString = if (extended) queryExecution.toString else queryExecution.simpleString
@@ -140,9 +141,10 @@ case class ExplainCommand(
 case class CacheTableCommand(
     tableName: String,
     plan: Option[LogicalPlan],
-    isLazy: Boolean) extends RunnableCommand {
+    isLazy: Boolean)
+  extends RunnableCommand {
 
-  override def run(sqlContext: SQLContext) = {
+  override def run(sqlContext: SQLContext): Seq[Row] = {
     plan.foreach { logicalPlan =>
       sqlContext.registerDataFrameAsTable(DataFrame(sqlContext, logicalPlan), tableName)
     }
@@ -166,7 +168,7 @@ case class CacheTableCommand(
 @DeveloperApi
 case class UncacheTableCommand(tableName: String) extends RunnableCommand {
 
-  override def run(sqlContext: SQLContext) = {
+  override def run(sqlContext: SQLContext): Seq[Row] = {
     sqlContext.table(tableName).unpersist(blocking = false)
     Seq.empty[Row]
   }
@@ -181,7 +183,7 @@ case class UncacheTableCommand(tableName: String) extends RunnableCommand {
 @DeveloperApi
 case object ClearCacheCommand extends RunnableCommand {
 
-  override def run(sqlContext: SQLContext) = {
+  override def run(sqlContext: SQLContext): Seq[Row] = {
     sqlContext.clearCache()
     Seq.empty[Row]
   }
@@ -196,9 +198,10 @@ case object ClearCacheCommand extends RunnableCommand {
 case class DescribeCommand(
     child: SparkPlan,
     override val output: Seq[Attribute],
-    isExtended: Boolean) extends RunnableCommand {
+    isExtended: Boolean)
+  extends RunnableCommand {
 
-  override def run(sqlContext: SQLContext) = {
+  override def run(sqlContext: SQLContext): Seq[Row] = {
     child.schema.fields.map { field =>
       val cmtKey = "comment"
       val comment = if (field.metadata.contains(cmtKey)) field.metadata.getString(cmtKey) else ""
@@ -220,7 +223,7 @@ case class DescribeCommand(
 case class ShowTablesCommand(databaseName: Option[String]) extends RunnableCommand {
 
   // The result of SHOW TABLES has two columns, tableName and isTemporary.
-  override val output = {
+  override val output: Seq[Attribute] = {
     val schema = StructType(
       StructField("tableName", StringType, false) ::
       StructField("isTemporary", BooleanType, false) :: Nil)
@@ -228,7 +231,7 @@ case class ShowTablesCommand(databaseName: Option[String]) extends RunnableComma
     schema.toAttributes
   }
 
-  override def run(sqlContext: SQLContext) = {
+  override def run(sqlContext: SQLContext): Seq[Row] = {
     // Since we need to return a Seq of rows, we will call getTables directly
     // instead of calling tables in sqlContext.
     val rows = sqlContext.catalog.getTables(databaseName).map {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message