Return-Path: X-Original-To: apmail-spark-commits-archive@minotaur.apache.org Delivered-To: apmail-spark-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id E0930176A3 for ; Fri, 20 Mar 2015 22:48:48 +0000 (UTC) Received: (qmail 3039 invoked by uid 500); 20 Mar 2015 22:48:42 -0000 Delivered-To: apmail-spark-commits-archive@spark.apache.org Received: (qmail 3012 invoked by uid 500); 20 Mar 2015 22:48:42 -0000 Mailing-List: contact commits-help@spark.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Delivered-To: mailing list commits@spark.apache.org Received: (qmail 2951 invoked by uid 99); 20 Mar 2015 22:48:42 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 20 Mar 2015 22:48:42 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 55E1DE10A8; Fri, 20 Mar 2015 22:48:42 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: rxin@apache.org To: commits@spark.apache.org Date: Fri, 20 Mar 2015 22:48:43 -0000 Message-Id: <5f626d38d5954dc5a2f33fb7ed22400e@git.apache.org> In-Reply-To: <4c1441bcb9af406f8c3ec3d2efc4d3a0@git.apache.org> References: <4c1441bcb9af406f8c3ec3d2efc4d3a0@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [2/2] spark git commit: [SPARK-6428][SQL] Added explicit type for all public methods in sql/core [SPARK-6428][SQL] Added explicit type for all public methods in sql/core Also implemented equals/hashCode when they are missing. This is done in order to enable automatic public method type checking. Author: Reynold Xin Closes #5104 from rxin/sql-hashcode-explicittype and squashes the following commits: ffce6f3 [Reynold Xin] Code review feedback. 8b36733 [Reynold Xin] [SPARK-6428][SQL] Added explicit type for all public methods. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/a95043b1 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/a95043b1 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/a95043b1 Branch: refs/heads/master Commit: a95043b1780bfde556db2dcc01511e40a12498dd Parents: 257cde7 Author: Reynold Xin Authored: Fri Mar 20 15:47:07 2015 -0700 Committer: Reynold Xin Committed: Fri Mar 20 15:47:07 2015 -0700 ---------------------------------------------------------------------- .../sql/catalyst/expressions/AttributeSet.scala | 3 +- .../spark/sql/catalyst/expressions/rows.scala | 21 +++++ .../org/apache/spark/sql/types/Decimal.scala | 2 +- .../org/apache/spark/sql/types/dataTypes.scala | 20 ++--- .../scala/org/apache/spark/sql/Column.scala | 2 +- .../scala/org/apache/spark/sql/DataFrame.scala | 6 +- .../scala/org/apache/spark/sql/SQLContext.scala | 8 +- .../org/apache/spark/sql/UDFRegistration.scala | 2 +- .../spark/sql/columnar/ColumnAccessor.scala | 4 +- .../spark/sql/columnar/ColumnBuilder.scala | 4 +- .../apache/spark/sql/columnar/ColumnStats.scala | 24 +++--- .../apache/spark/sql/columnar/ColumnType.scala | 56 +++++++------- .../columnar/InMemoryColumnarTableScan.scala | 56 ++++++++------ .../sql/columnar/NullableColumnAccessor.scala | 2 +- .../CompressibleColumnAccessor.scala | 4 +- .../compression/CompressibleColumnBuilder.scala | 2 +- .../compression/compressionSchemes.scala | 80 +++++++++++--------- .../apache/spark/sql/execution/Aggregate.scala | 8 +- .../apache/spark/sql/execution/Exchange.scala | 19 ++--- .../spark/sql/execution/ExistingRDD.scala | 20 ++--- .../org/apache/spark/sql/execution/Expand.scala | 5 +- .../apache/spark/sql/execution/Generate.scala | 3 +- .../sql/execution/GeneratedAggregate.scala | 11 +-- .../spark/sql/execution/LocalTableScan.scala | 7 +- .../apache/spark/sql/execution/SparkPlan.scala | 1 + .../sql/execution/SparkSqlSerializer.scala | 2 +- .../spark/sql/execution/SparkStrategies.scala | 6 +- .../spark/sql/execution/basicOperators.scala | 73 +++++++++--------- .../apache/spark/sql/execution/commands.scala | 33 ++++---- .../spark/sql/execution/debug/package.scala | 30 ++++---- .../sql/execution/joins/BroadcastHashJoin.scala | 10 ++- .../joins/BroadcastLeftSemiJoinHash.scala | 10 +-- .../joins/BroadcastNestedLoopJoin.scala | 5 +- .../sql/execution/joins/CartesianProduct.scala | 8 +- .../spark/sql/execution/joins/HashJoin.scala | 4 +- .../sql/execution/joins/HashOuterJoin.scala | 53 ++++++------- .../sql/execution/joins/HashedRelation.scala | 4 +- .../sql/execution/joins/LeftSemiJoinBNL.scala | 10 ++- .../sql/execution/joins/LeftSemiJoinHash.scala | 11 +-- .../sql/execution/joins/ShuffledHashJoin.scala | 6 +- .../apache/spark/sql/execution/pythonUdfs.scala | 21 +++-- .../org/apache/spark/sql/jdbc/JDBCRDD.scala | 3 +- .../apache/spark/sql/jdbc/JDBCRelation.scala | 8 +- .../apache/spark/sql/json/JSONRelation.scala | 6 +- .../spark/sql/parquet/ParquetConverter.scala | 2 +- .../spark/sql/parquet/ParquetRelation.scala | 10 ++- .../sql/parquet/ParquetTableOperations.scala | 12 +-- .../apache/spark/sql/parquet/newParquet.scala | 37 ++++++--- .../spark/sql/parquet/timestamp/NanoTime.scala | 6 +- .../spark/sql/sources/LogicalRelation.scala | 16 ++-- .../org/apache/spark/sql/sources/commands.scala | 2 +- .../org/apache/spark/sql/sources/ddl.scala | 6 +- .../org/apache/spark/sql/sources/rules.scala | 4 +- 53 files changed, 438 insertions(+), 330 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/a95043b1/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/AttributeSet.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/AttributeSet.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/AttributeSet.scala index a9ba0be..adaeab0 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/AttributeSet.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/AttributeSet.scala @@ -17,7 +17,6 @@ package org.apache.spark.sql.catalyst.expressions -import org.apache.spark.sql.catalyst.analysis.Star protected class AttributeEquals(val a: Attribute) { override def hashCode() = a match { @@ -115,7 +114,7 @@ class AttributeSet private (val baseSet: Set[AttributeEquals]) // sorts of things in its closure. override def toSeq: Seq[Attribute] = baseSet.map(_.a).toArray.toSeq - override def toString = "{" + baseSet.map(_.a).mkString(", ") + "}" + override def toString: String = "{" + baseSet.map(_.a).mkString(", ") + "}" override def isEmpty: Boolean = baseSet.isEmpty } http://git-wip-us.apache.org/repos/asf/spark/blob/a95043b1/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/rows.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/rows.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/rows.scala index faa3667..f03d6f7 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/rows.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/rows.scala @@ -146,6 +146,27 @@ class GenericRow(protected[sql] val values: Array[Any]) extends Row { result } + override def equals(o: Any): Boolean = o match { + case other: Row => + if (values.length != other.length) { + return false + } + + var i = 0 + while (i < values.length) { + if (isNullAt(i) != other.isNullAt(i)) { + return false + } + if (apply(i) != other.apply(i)) { + return false + } + i += 1 + } + true + + case _ => false + } + def copy() = this } http://git-wip-us.apache.org/repos/asf/spark/blob/a95043b1/sql/catalyst/src/main/scala/org/apache/spark/sql/types/Decimal.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/Decimal.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/Decimal.scala index 21cc6ce..994c520 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/Decimal.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/Decimal.scala @@ -246,7 +246,7 @@ final class Decimal extends Ordered[Decimal] with Serializable { } } - override def equals(other: Any) = other match { + override def equals(other: Any): Boolean = other match { case d: Decimal => compare(d) == 0 case _ => http://git-wip-us.apache.org/repos/asf/spark/blob/a95043b1/sql/catalyst/src/main/scala/org/apache/spark/sql/types/dataTypes.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/dataTypes.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/dataTypes.scala index bf39603..d973144 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/dataTypes.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/dataTypes.scala @@ -307,7 +307,7 @@ protected[sql] object NativeType { protected[sql] trait PrimitiveType extends DataType { - override def isPrimitive = true + override def isPrimitive: Boolean = true } @@ -442,7 +442,7 @@ class TimestampType private() extends NativeType { @transient private[sql] lazy val tag = ScalaReflectionLock.synchronized { typeTag[JvmType] } private[sql] val ordering = new Ordering[JvmType] { - def compare(x: Timestamp, y: Timestamp) = x.compareTo(y) + def compare(x: Timestamp, y: Timestamp): Int = x.compareTo(y) } /** @@ -542,7 +542,7 @@ class LongType private() extends IntegralType { */ override def defaultSize: Int = 8 - override def simpleString = "bigint" + override def simpleString: String = "bigint" private[spark] override def asNullable: LongType = this } @@ -572,7 +572,7 @@ class IntegerType private() extends IntegralType { */ override def defaultSize: Int = 4 - override def simpleString = "int" + override def simpleString: String = "int" private[spark] override def asNullable: IntegerType = this } @@ -602,7 +602,7 @@ class ShortType private() extends IntegralType { */ override def defaultSize: Int = 2 - override def simpleString = "smallint" + override def simpleString: String = "smallint" private[spark] override def asNullable: ShortType = this } @@ -632,7 +632,7 @@ class ByteType private() extends IntegralType { */ override def defaultSize: Int = 1 - override def simpleString = "tinyint" + override def simpleString: String = "tinyint" private[spark] override def asNullable: ByteType = this } @@ -696,7 +696,7 @@ case class DecimalType(precisionInfo: Option[PrecisionInfo]) extends FractionalT */ override def defaultSize: Int = 4096 - override def simpleString = precisionInfo match { + override def simpleString: String = precisionInfo match { case Some(PrecisionInfo(precision, scale)) => s"decimal($precision,$scale)" case None => "decimal(10,0)" } @@ -836,7 +836,7 @@ case class ArrayType(elementType: DataType, containsNull: Boolean) extends DataT */ override def defaultSize: Int = 100 * elementType.defaultSize - override def simpleString = s"array<${elementType.simpleString}>" + override def simpleString: String = s"array<${elementType.simpleString}>" private[spark] override def asNullable: ArrayType = ArrayType(elementType.asNullable, containsNull = true) @@ -1065,7 +1065,7 @@ case class StructType(fields: Array[StructField]) extends DataType with Seq[Stru */ override def defaultSize: Int = fields.map(_.dataType.defaultSize).sum - override def simpleString = { + override def simpleString: String = { val fieldTypes = fields.map(field => s"${field.name}:${field.dataType.simpleString}") s"struct<${fieldTypes.mkString(",")}>" } @@ -1142,7 +1142,7 @@ case class MapType( */ override def defaultSize: Int = 100 * (keyType.defaultSize + valueType.defaultSize) - override def simpleString = s"map<${keyType.simpleString},${valueType.simpleString}>" + override def simpleString: String = s"map<${keyType.simpleString},${valueType.simpleString}>" private[spark] override def asNullable: MapType = MapType(keyType.asNullable, valueType.asNullable, valueContainsNull = true) http://git-wip-us.apache.org/repos/asf/spark/blob/a95043b1/sql/core/src/main/scala/org/apache/spark/sql/Column.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Column.scala b/sql/core/src/main/scala/org/apache/spark/sql/Column.scala index 908c78a..b7a13a1 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/Column.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/Column.scala @@ -59,7 +59,7 @@ class Column(protected[sql] val expr: Expression) { override def toString: String = expr.prettyString - override def equals(that: Any) = that match { + override def equals(that: Any): Boolean = that match { case that: Column => that.expr.equals(this.expr) case _ => false } http://git-wip-us.apache.org/repos/asf/spark/blob/a95043b1/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala index 46f5070..8b8f86c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala @@ -33,7 +33,7 @@ import org.apache.spark.api.java.JavaRDD import org.apache.spark.api.python.SerDeUtil import org.apache.spark.rdd.RDD import org.apache.spark.storage.StorageLevel -import org.apache.spark.sql.catalyst.{ScalaReflection, SqlParser} +import org.apache.spark.sql.catalyst.{expressions, ScalaReflection, SqlParser} import org.apache.spark.sql.catalyst.analysis.{UnresolvedRelation, ResolvedStar} import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.plans.{JoinType, Inner} @@ -722,7 +722,7 @@ class DataFrame private[sql]( : DataFrame = { val dataType = ScalaReflection.schemaFor[B].dataType val attributes = AttributeReference(outputColumn, dataType)() :: Nil - def rowFunction(row: Row) = { + def rowFunction(row: Row): TraversableOnce[Row] = { f(row(0).asInstanceOf[A]).map(o => Row(ScalaReflection.convertToCatalyst(o, dataType))) } val generator = UserDefinedGenerator(attributes, rowFunction, apply(inputColumn).expr :: Nil) @@ -1155,7 +1155,7 @@ class DataFrame private[sql]( val gen = new JsonFactory().createGenerator(writer).setRootValueSeparator(null) new Iterator[String] { - override def hasNext = iter.hasNext + override def hasNext: Boolean = iter.hasNext override def next(): String = { JsonRDD.rowToJSON(rowSchema, gen)(iter.next()) gen.flush() http://git-wip-us.apache.org/repos/asf/spark/blob/a95043b1/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala index 6de46a5..dc9912b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala @@ -144,7 +144,7 @@ class SQLContext(@transient val sparkContext: SparkContext) @transient protected[sql] val tlSession = new ThreadLocal[SQLSession]() { - override def initialValue = defaultSession + override def initialValue: SQLSession = defaultSession } @transient @@ -988,9 +988,9 @@ class SQLContext(@transient val sparkContext: SparkContext) val sqlContext: SQLContext = self - def codegenEnabled = self.conf.codegenEnabled + def codegenEnabled: Boolean = self.conf.codegenEnabled - def numPartitions = self.conf.numShufflePartitions + def numPartitions: Int = self.conf.numShufflePartitions def strategies: Seq[Strategy] = experimental.extraStrategies ++ ( @@ -1109,7 +1109,7 @@ class SQLContext(@transient val sparkContext: SparkContext) lazy val analyzed: LogicalPlan = analyzer(logical) lazy val withCachedData: LogicalPlan = { - assertAnalyzed + assertAnalyzed() cacheManager.useCachedData(analyzed) } lazy val optimizedPlan: LogicalPlan = optimizer(withCachedData) http://git-wip-us.apache.org/repos/asf/spark/blob/a95043b1/sql/core/src/main/scala/org/apache/spark/sql/UDFRegistration.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/UDFRegistration.scala b/sql/core/src/main/scala/org/apache/spark/sql/UDFRegistration.scala index 8051df2..b97aaf7 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/UDFRegistration.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/UDFRegistration.scala @@ -61,7 +61,7 @@ class UDFRegistration private[sql] (sqlContext: SQLContext) extends Logging { val dataType = sqlContext.parseDataType(stringDataType) - def builder(e: Seq[Expression]) = + def builder(e: Seq[Expression]): PythonUDF = PythonUDF( name, command, http://git-wip-us.apache.org/repos/asf/spark/blob/a95043b1/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnAccessor.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnAccessor.scala b/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnAccessor.scala index b615eaa..f615fb3 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnAccessor.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnAccessor.scala @@ -48,9 +48,9 @@ private[sql] abstract class BasicColumnAccessor[T <: DataType, JvmType]( protected def initialize() {} - def hasNext = buffer.hasRemaining + override def hasNext: Boolean = buffer.hasRemaining - def extractTo(row: MutableRow, ordinal: Int): Unit = { + override def extractTo(row: MutableRow, ordinal: Int): Unit = { extractSingle(row, ordinal) } http://git-wip-us.apache.org/repos/asf/spark/blob/a95043b1/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnBuilder.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnBuilder.scala b/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnBuilder.scala index d8d24a5..c881747 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnBuilder.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnBuilder.scala @@ -58,7 +58,7 @@ private[sql] class BasicColumnBuilder[T <: DataType, JvmType]( override def initialize( initialSize: Int, columnName: String = "", - useCompression: Boolean = false) = { + useCompression: Boolean = false): Unit = { val size = if (initialSize == 0) DEFAULT_INITIAL_BUFFER_SIZE else initialSize this.columnName = columnName @@ -73,7 +73,7 @@ private[sql] class BasicColumnBuilder[T <: DataType, JvmType]( columnType.append(row, ordinal, buffer) } - override def build() = { + override def build(): ByteBuffer = { buffer.flip().asInstanceOf[ByteBuffer] } } http://git-wip-us.apache.org/repos/asf/spark/blob/a95043b1/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnStats.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnStats.scala b/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnStats.scala index 04047b9..87a6631 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnStats.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnStats.scala @@ -76,7 +76,7 @@ private[sql] sealed trait ColumnStats extends Serializable { private[sql] class NoopColumnStats extends ColumnStats { override def gatherStats(row: Row, ordinal: Int): Unit = super.gatherStats(row, ordinal) - def collectedStatistics = Row(null, null, nullCount, count, 0L) + override def collectedStatistics: Row = Row(null, null, nullCount, count, 0L) } private[sql] class BooleanColumnStats extends ColumnStats { @@ -93,7 +93,7 @@ private[sql] class BooleanColumnStats extends ColumnStats { } } - def collectedStatistics = Row(lower, upper, nullCount, count, sizeInBytes) + override def collectedStatistics: Row = Row(lower, upper, nullCount, count, sizeInBytes) } private[sql] class ByteColumnStats extends ColumnStats { @@ -110,7 +110,7 @@ private[sql] class ByteColumnStats extends ColumnStats { } } - def collectedStatistics = Row(lower, upper, nullCount, count, sizeInBytes) + override def collectedStatistics: Row = Row(lower, upper, nullCount, count, sizeInBytes) } private[sql] class ShortColumnStats extends ColumnStats { @@ -127,7 +127,7 @@ private[sql] class ShortColumnStats extends ColumnStats { } } - def collectedStatistics = Row(lower, upper, nullCount, count, sizeInBytes) + override def collectedStatistics: Row = Row(lower, upper, nullCount, count, sizeInBytes) } private[sql] class LongColumnStats extends ColumnStats { @@ -144,7 +144,7 @@ private[sql] class LongColumnStats extends ColumnStats { } } - def collectedStatistics = Row(lower, upper, nullCount, count, sizeInBytes) + override def collectedStatistics: Row = Row(lower, upper, nullCount, count, sizeInBytes) } private[sql] class DoubleColumnStats extends ColumnStats { @@ -161,7 +161,7 @@ private[sql] class DoubleColumnStats extends ColumnStats { } } - def collectedStatistics = Row(lower, upper, nullCount, count, sizeInBytes) + override def collectedStatistics: Row = Row(lower, upper, nullCount, count, sizeInBytes) } private[sql] class FloatColumnStats extends ColumnStats { @@ -178,7 +178,7 @@ private[sql] class FloatColumnStats extends ColumnStats { } } - def collectedStatistics = Row(lower, upper, nullCount, count, sizeInBytes) + override def collectedStatistics: Row = Row(lower, upper, nullCount, count, sizeInBytes) } private[sql] class FixedDecimalColumnStats extends ColumnStats { @@ -212,7 +212,7 @@ private[sql] class IntColumnStats extends ColumnStats { } } - def collectedStatistics = Row(lower, upper, nullCount, count, sizeInBytes) + override def collectedStatistics: Row = Row(lower, upper, nullCount, count, sizeInBytes) } private[sql] class StringColumnStats extends ColumnStats { @@ -229,7 +229,7 @@ private[sql] class StringColumnStats extends ColumnStats { } } - def collectedStatistics = Row(lower, upper, nullCount, count, sizeInBytes) + override def collectedStatistics: Row = Row(lower, upper, nullCount, count, sizeInBytes) } private[sql] class DateColumnStats extends IntColumnStats @@ -248,7 +248,7 @@ private[sql] class TimestampColumnStats extends ColumnStats { } } - def collectedStatistics = Row(lower, upper, nullCount, count, sizeInBytes) + override def collectedStatistics: Row = Row(lower, upper, nullCount, count, sizeInBytes) } private[sql] class BinaryColumnStats extends ColumnStats { @@ -259,7 +259,7 @@ private[sql] class BinaryColumnStats extends ColumnStats { } } - def collectedStatistics = Row(null, null, nullCount, count, sizeInBytes) + override def collectedStatistics: Row = Row(null, null, nullCount, count, sizeInBytes) } private[sql] class GenericColumnStats extends ColumnStats { @@ -270,5 +270,5 @@ private[sql] class GenericColumnStats extends ColumnStats { } } - def collectedStatistics = Row(null, null, nullCount, count, sizeInBytes) + override def collectedStatistics: Row = Row(null, null, nullCount, count, sizeInBytes) } http://git-wip-us.apache.org/repos/asf/spark/blob/a95043b1/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnType.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnType.scala b/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnType.scala index 36ea1c7..c47497e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnType.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnType.scala @@ -98,7 +98,7 @@ private[sql] sealed abstract class ColumnType[T <: DataType, JvmType]( */ def clone(v: JvmType): JvmType = v - override def toString = getClass.getSimpleName.stripSuffix("$") + override def toString: String = getClass.getSimpleName.stripSuffix("$") } private[sql] abstract class NativeColumnType[T <: NativeType]( @@ -114,7 +114,7 @@ private[sql] abstract class NativeColumnType[T <: NativeType]( } private[sql] object INT extends NativeColumnType(IntegerType, 0, 4) { - def append(v: Int, buffer: ByteBuffer): Unit = { + override def append(v: Int, buffer: ByteBuffer): Unit = { buffer.putInt(v) } @@ -122,7 +122,7 @@ private[sql] object INT extends NativeColumnType(IntegerType, 0, 4) { buffer.putInt(row.getInt(ordinal)) } - def extract(buffer: ByteBuffer) = { + override def extract(buffer: ByteBuffer): Int = { buffer.getInt() } @@ -134,7 +134,7 @@ private[sql] object INT extends NativeColumnType(IntegerType, 0, 4) { row.setInt(ordinal, value) } - override def getField(row: Row, ordinal: Int) = row.getInt(ordinal) + override def getField(row: Row, ordinal: Int): Int = row.getInt(ordinal) override def copyField(from: Row, fromOrdinal: Int, to: MutableRow, toOrdinal: Int): Unit = { to.setInt(toOrdinal, from.getInt(fromOrdinal)) @@ -150,7 +150,7 @@ private[sql] object LONG extends NativeColumnType(LongType, 1, 8) { buffer.putLong(row.getLong(ordinal)) } - override def extract(buffer: ByteBuffer) = { + override def extract(buffer: ByteBuffer): Long = { buffer.getLong() } @@ -162,7 +162,7 @@ private[sql] object LONG extends NativeColumnType(LongType, 1, 8) { row.setLong(ordinal, value) } - override def getField(row: Row, ordinal: Int) = row.getLong(ordinal) + override def getField(row: Row, ordinal: Int): Long = row.getLong(ordinal) override def copyField(from: Row, fromOrdinal: Int, to: MutableRow, toOrdinal: Int): Unit = { to.setLong(toOrdinal, from.getLong(fromOrdinal)) @@ -178,7 +178,7 @@ private[sql] object FLOAT extends NativeColumnType(FloatType, 2, 4) { buffer.putFloat(row.getFloat(ordinal)) } - override def extract(buffer: ByteBuffer) = { + override def extract(buffer: ByteBuffer): Float = { buffer.getFloat() } @@ -190,7 +190,7 @@ private[sql] object FLOAT extends NativeColumnType(FloatType, 2, 4) { row.setFloat(ordinal, value) } - override def getField(row: Row, ordinal: Int) = row.getFloat(ordinal) + override def getField(row: Row, ordinal: Int): Float = row.getFloat(ordinal) override def copyField(from: Row, fromOrdinal: Int, to: MutableRow, toOrdinal: Int): Unit = { to.setFloat(toOrdinal, from.getFloat(fromOrdinal)) @@ -206,7 +206,7 @@ private[sql] object DOUBLE extends NativeColumnType(DoubleType, 3, 8) { buffer.putDouble(row.getDouble(ordinal)) } - override def extract(buffer: ByteBuffer) = { + override def extract(buffer: ByteBuffer): Double = { buffer.getDouble() } @@ -218,7 +218,7 @@ private[sql] object DOUBLE extends NativeColumnType(DoubleType, 3, 8) { row.setDouble(ordinal, value) } - override def getField(row: Row, ordinal: Int) = row.getDouble(ordinal) + override def getField(row: Row, ordinal: Int): Double = row.getDouble(ordinal) override def copyField(from: Row, fromOrdinal: Int, to: MutableRow, toOrdinal: Int): Unit = { to.setDouble(toOrdinal, from.getDouble(fromOrdinal)) @@ -234,7 +234,7 @@ private[sql] object BOOLEAN extends NativeColumnType(BooleanType, 4, 1) { buffer.put(if (row.getBoolean(ordinal)) 1: Byte else 0: Byte) } - override def extract(buffer: ByteBuffer) = buffer.get() == 1 + override def extract(buffer: ByteBuffer): Boolean = buffer.get() == 1 override def extract(buffer: ByteBuffer, row: MutableRow, ordinal: Int): Unit = { row.setBoolean(ordinal, buffer.get() == 1) @@ -244,7 +244,7 @@ private[sql] object BOOLEAN extends NativeColumnType(BooleanType, 4, 1) { row.setBoolean(ordinal, value) } - override def getField(row: Row, ordinal: Int) = row.getBoolean(ordinal) + override def getField(row: Row, ordinal: Int): Boolean = row.getBoolean(ordinal) override def copyField(from: Row, fromOrdinal: Int, to: MutableRow, toOrdinal: Int): Unit = { to.setBoolean(toOrdinal, from.getBoolean(fromOrdinal)) @@ -260,7 +260,7 @@ private[sql] object BYTE extends NativeColumnType(ByteType, 5, 1) { buffer.put(row.getByte(ordinal)) } - override def extract(buffer: ByteBuffer) = { + override def extract(buffer: ByteBuffer): Byte = { buffer.get() } @@ -272,7 +272,7 @@ private[sql] object BYTE extends NativeColumnType(ByteType, 5, 1) { row.setByte(ordinal, value) } - override def getField(row: Row, ordinal: Int) = row.getByte(ordinal) + override def getField(row: Row, ordinal: Int): Byte = row.getByte(ordinal) override def copyField(from: Row, fromOrdinal: Int, to: MutableRow, toOrdinal: Int): Unit = { to.setByte(toOrdinal, from.getByte(fromOrdinal)) @@ -288,7 +288,7 @@ private[sql] object SHORT extends NativeColumnType(ShortType, 6, 2) { buffer.putShort(row.getShort(ordinal)) } - override def extract(buffer: ByteBuffer) = { + override def extract(buffer: ByteBuffer): Short = { buffer.getShort() } @@ -300,7 +300,7 @@ private[sql] object SHORT extends NativeColumnType(ShortType, 6, 2) { row.setShort(ordinal, value) } - override def getField(row: Row, ordinal: Int) = row.getShort(ordinal) + override def getField(row: Row, ordinal: Int): Short = row.getShort(ordinal) override def copyField(from: Row, fromOrdinal: Int, to: MutableRow, toOrdinal: Int): Unit = { to.setShort(toOrdinal, from.getShort(fromOrdinal)) @@ -317,7 +317,7 @@ private[sql] object STRING extends NativeColumnType(StringType, 7, 8) { buffer.putInt(stringBytes.length).put(stringBytes, 0, stringBytes.length) } - override def extract(buffer: ByteBuffer) = { + override def extract(buffer: ByteBuffer): String = { val length = buffer.getInt() val stringBytes = new Array[Byte](length) buffer.get(stringBytes, 0, length) @@ -328,7 +328,7 @@ private[sql] object STRING extends NativeColumnType(StringType, 7, 8) { row.setString(ordinal, value) } - override def getField(row: Row, ordinal: Int) = row.getString(ordinal) + override def getField(row: Row, ordinal: Int): String = row.getString(ordinal) override def copyField(from: Row, fromOrdinal: Int, to: MutableRow, toOrdinal: Int): Unit = { to.setString(toOrdinal, from.getString(fromOrdinal)) @@ -336,7 +336,7 @@ private[sql] object STRING extends NativeColumnType(StringType, 7, 8) { } private[sql] object DATE extends NativeColumnType(DateType, 8, 4) { - override def extract(buffer: ByteBuffer) = { + override def extract(buffer: ByteBuffer): Int = { buffer.getInt } @@ -344,7 +344,7 @@ private[sql] object DATE extends NativeColumnType(DateType, 8, 4) { buffer.putInt(v) } - override def getField(row: Row, ordinal: Int) = { + override def getField(row: Row, ordinal: Int): Int = { row(ordinal).asInstanceOf[Int] } @@ -354,7 +354,7 @@ private[sql] object DATE extends NativeColumnType(DateType, 8, 4) { } private[sql] object TIMESTAMP extends NativeColumnType(TimestampType, 9, 12) { - override def extract(buffer: ByteBuffer) = { + override def extract(buffer: ByteBuffer): Timestamp = { val timestamp = new Timestamp(buffer.getLong()) timestamp.setNanos(buffer.getInt()) timestamp @@ -364,7 +364,7 @@ private[sql] object TIMESTAMP extends NativeColumnType(TimestampType, 9, 12) { buffer.putLong(v.getTime).putInt(v.getNanos) } - override def getField(row: Row, ordinal: Int) = { + override def getField(row: Row, ordinal: Int): Timestamp = { row(ordinal).asInstanceOf[Timestamp] } @@ -405,7 +405,7 @@ private[sql] sealed abstract class ByteArrayColumnType[T <: DataType]( defaultSize: Int) extends ColumnType[T, Array[Byte]](typeId, defaultSize) { - override def actualSize(row: Row, ordinal: Int) = { + override def actualSize(row: Row, ordinal: Int): Int = { getField(row, ordinal).length + 4 } @@ -413,7 +413,7 @@ private[sql] sealed abstract class ByteArrayColumnType[T <: DataType]( buffer.putInt(v.length).put(v, 0, v.length) } - override def extract(buffer: ByteBuffer) = { + override def extract(buffer: ByteBuffer): Array[Byte] = { val length = buffer.getInt() val bytes = new Array[Byte](length) buffer.get(bytes, 0, length) @@ -426,7 +426,9 @@ private[sql] object BINARY extends ByteArrayColumnType[BinaryType.type](11, 16) row(ordinal) = value } - override def getField(row: Row, ordinal: Int) = row(ordinal).asInstanceOf[Array[Byte]] + override def getField(row: Row, ordinal: Int): Array[Byte] = { + row(ordinal).asInstanceOf[Array[Byte]] + } } // Used to process generic objects (all types other than those listed above). Objects should be @@ -437,7 +439,9 @@ private[sql] object GENERIC extends ByteArrayColumnType[DataType](12, 16) { row(ordinal) = SparkSqlSerializer.deserialize[Any](value) } - override def getField(row: Row, ordinal: Int) = SparkSqlSerializer.serialize(row(ordinal)) + override def getField(row: Row, ordinal: Int): Array[Byte] = { + SparkSqlSerializer.serialize(row(ordinal)) + } } private[sql] object ColumnType { http://git-wip-us.apache.org/repos/asf/spark/blob/a95043b1/sql/core/src/main/scala/org/apache/spark/sql/columnar/InMemoryColumnarTableScan.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/columnar/InMemoryColumnarTableScan.scala b/sql/core/src/main/scala/org/apache/spark/sql/columnar/InMemoryColumnarTableScan.scala index 387faee..6eee0c8 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/columnar/InMemoryColumnarTableScan.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/columnar/InMemoryColumnarTableScan.scala @@ -19,6 +19,9 @@ package org.apache.spark.sql.columnar import java.nio.ByteBuffer +import org.apache.spark.Accumulator +import org.apache.spark.sql.catalyst.expressions + import scala.collection.mutable.ArrayBuffer import org.apache.spark.rdd.RDD @@ -77,20 +80,23 @@ private[sql] case class InMemoryRelation( _statistics } - override def statistics = if (_statistics == null) { - if (batchStats.value.isEmpty) { - // Underlying columnar RDD hasn't been materialized, no useful statistics information - // available, return the default statistics. - Statistics(sizeInBytes = child.sqlContext.conf.defaultSizeInBytes) + override def statistics: Statistics = { + if (_statistics == null) { + if (batchStats.value.isEmpty) { + // Underlying columnar RDD hasn't been materialized, no useful statistics information + // available, return the default statistics. + Statistics(sizeInBytes = child.sqlContext.conf.defaultSizeInBytes) + } else { + // Underlying columnar RDD has been materialized, required information has also been + // collected via the `batchStats` accumulator, compute the final statistics, + // and update `_statistics`. + _statistics = Statistics(sizeInBytes = computeSizeInBytes) + _statistics + } } else { - // Underlying columnar RDD has been materialized, required information has also been collected - // via the `batchStats` accumulator, compute the final statistics, and update `_statistics`. - _statistics = Statistics(sizeInBytes = computeSizeInBytes) + // Pre-computed statistics _statistics } - } else { - // Pre-computed statistics - _statistics } // If the cached column buffers were not passed in, we calculate them in the constructor. @@ -99,7 +105,7 @@ private[sql] case class InMemoryRelation( buildBuffers() } - def recache() = { + def recache(): Unit = { _cachedColumnBuffers.unpersist() _cachedColumnBuffers = null buildBuffers() @@ -109,7 +115,7 @@ private[sql] case class InMemoryRelation( val output = child.output val cached = child.execute().mapPartitions { rowIterator => new Iterator[CachedBatch] { - def next() = { + def next(): CachedBatch = { val columnBuilders = output.map { attribute => val columnType = ColumnType(attribute.dataType) val initialBufferSize = columnType.defaultSize * batchSize @@ -144,7 +150,7 @@ private[sql] case class InMemoryRelation( CachedBatch(columnBuilders.map(_.build().array()), stats) } - def hasNext = rowIterator.hasNext + def hasNext: Boolean = rowIterator.hasNext } }.persist(storageLevel) @@ -158,9 +164,9 @@ private[sql] case class InMemoryRelation( _cachedColumnBuffers, statisticsToBePropagated) } - override def children = Seq.empty + override def children: Seq[LogicalPlan] = Seq.empty - override def newInstance() = { + override def newInstance(): this.type = { new InMemoryRelation( output.map(_.newInstance()), useCompression, @@ -172,7 +178,7 @@ private[sql] case class InMemoryRelation( statisticsToBePropagated).asInstanceOf[this.type] } - def cachedColumnBuffers = _cachedColumnBuffers + def cachedColumnBuffers: RDD[CachedBatch] = _cachedColumnBuffers override protected def otherCopyArgs: Seq[AnyRef] = Seq(_cachedColumnBuffers, statisticsToBePropagated) @@ -220,7 +226,7 @@ private[sql] case class InMemoryColumnarTableScan( case IsNotNull(a: Attribute) => statsFor(a).count - statsFor(a).nullCount > 0 } - val partitionFilters = { + val partitionFilters: Seq[Expression] = { predicates.flatMap { p => val filter = buildFilter.lift(p) val boundFilter = @@ -239,12 +245,12 @@ private[sql] case class InMemoryColumnarTableScan( } // Accumulators used for testing purposes - val readPartitions = sparkContext.accumulator(0) - val readBatches = sparkContext.accumulator(0) + val readPartitions: Accumulator[Int] = sparkContext.accumulator(0) + val readBatches: Accumulator[Int] = sparkContext.accumulator(0) private val inMemoryPartitionPruningEnabled = sqlContext.conf.inMemoryPartitionPruning - override def execute() = { + override def execute(): RDD[Row] = { readPartitions.setValue(0) readBatches.setValue(0) @@ -271,7 +277,7 @@ private[sql] case class InMemoryColumnarTableScan( val nextRow = new SpecificMutableRow(requestedColumnDataTypes) - def cachedBatchesToRows(cacheBatches: Iterator[CachedBatch]) = { + def cachedBatchesToRows(cacheBatches: Iterator[CachedBatch]): Iterator[Row] = { val rows = cacheBatches.flatMap { cachedBatch => // Build column accessors val columnAccessors = requestedColumnIndices.map { batchColumnIndex => @@ -283,7 +289,7 @@ private[sql] case class InMemoryColumnarTableScan( // Extract rows via column accessors new Iterator[Row] { private[this] val rowLen = nextRow.length - override def next() = { + override def next(): Row = { var i = 0 while (i < rowLen) { columnAccessors(i).extractTo(nextRow, i) @@ -292,7 +298,7 @@ private[sql] case class InMemoryColumnarTableScan( nextRow } - override def hasNext = columnAccessors(0).hasNext + override def hasNext: Boolean = columnAccessors(0).hasNext } } @@ -308,7 +314,7 @@ private[sql] case class InMemoryColumnarTableScan( if (inMemoryPartitionPruningEnabled) { cachedBatchIterator.filter { cachedBatch => if (!partitionFilter(cachedBatch.stats)) { - def statsString = relation.partitionStatistics.schema + def statsString: String = relation.partitionStatistics.schema .zip(cachedBatch.stats.toSeq) .map { case (a, s) => s"${a.name}: $s" } .mkString(", ") http://git-wip-us.apache.org/repos/asf/spark/blob/a95043b1/sql/core/src/main/scala/org/apache/spark/sql/columnar/NullableColumnAccessor.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/columnar/NullableColumnAccessor.scala b/sql/core/src/main/scala/org/apache/spark/sql/columnar/NullableColumnAccessor.scala index 965782a..4d35650 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/columnar/NullableColumnAccessor.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/columnar/NullableColumnAccessor.scala @@ -55,5 +55,5 @@ private[sql] trait NullableColumnAccessor extends ColumnAccessor { pos += 1 } - abstract override def hasNext = seenNulls < nullCount || super.hasNext + abstract override def hasNext: Boolean = seenNulls < nullCount || super.hasNext } http://git-wip-us.apache.org/repos/asf/spark/blob/a95043b1/sql/core/src/main/scala/org/apache/spark/sql/columnar/compression/CompressibleColumnAccessor.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/columnar/compression/CompressibleColumnAccessor.scala b/sql/core/src/main/scala/org/apache/spark/sql/columnar/compression/CompressibleColumnAccessor.scala index 7dff9de..d0b602a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/columnar/compression/CompressibleColumnAccessor.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/columnar/compression/CompressibleColumnAccessor.scala @@ -26,12 +26,12 @@ private[sql] trait CompressibleColumnAccessor[T <: NativeType] extends ColumnAcc private var decoder: Decoder[T] = _ - abstract override protected def initialize() = { + abstract override protected def initialize(): Unit = { super.initialize() decoder = CompressionScheme(underlyingBuffer.getInt()).decoder(buffer, columnType) } - abstract override def hasNext = super.hasNext || decoder.hasNext + abstract override def hasNext: Boolean = super.hasNext || decoder.hasNext override def extractSingle(row: MutableRow, ordinal: Int): Unit = { decoder.next(row, ordinal) http://git-wip-us.apache.org/repos/asf/spark/blob/a95043b1/sql/core/src/main/scala/org/apache/spark/sql/columnar/compression/CompressibleColumnBuilder.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/columnar/compression/CompressibleColumnBuilder.scala b/sql/core/src/main/scala/org/apache/spark/sql/columnar/compression/CompressibleColumnBuilder.scala index aead768..b9cfc5d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/columnar/compression/CompressibleColumnBuilder.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/columnar/compression/CompressibleColumnBuilder.scala @@ -81,7 +81,7 @@ private[sql] trait CompressibleColumnBuilder[T <: NativeType] } } - override def build() = { + override def build(): ByteBuffer = { val nonNullBuffer = buildNonNulls() val typeId = nonNullBuffer.getInt() val encoder: Encoder[T] = { http://git-wip-us.apache.org/repos/asf/spark/blob/a95043b1/sql/core/src/main/scala/org/apache/spark/sql/columnar/compression/compressionSchemes.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/columnar/compression/compressionSchemes.scala b/sql/core/src/main/scala/org/apache/spark/sql/columnar/compression/compressionSchemes.scala index 68a5b1d..8727d71 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/columnar/compression/compressionSchemes.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/columnar/compression/compressionSchemes.scala @@ -33,22 +33,23 @@ import org.apache.spark.util.Utils private[sql] case object PassThrough extends CompressionScheme { override val typeId = 0 - override def supports(columnType: ColumnType[_, _]) = true + override def supports(columnType: ColumnType[_, _]): Boolean = true - override def encoder[T <: NativeType](columnType: NativeColumnType[T]) = { + override def encoder[T <: NativeType](columnType: NativeColumnType[T]): Encoder[T] = { new this.Encoder[T](columnType) } - override def decoder[T <: NativeType](buffer: ByteBuffer, columnType: NativeColumnType[T]) = { + override def decoder[T <: NativeType]( + buffer: ByteBuffer, columnType: NativeColumnType[T]): Decoder[T] = { new this.Decoder(buffer, columnType) } class Encoder[T <: NativeType](columnType: NativeColumnType[T]) extends compression.Encoder[T] { - override def uncompressedSize = 0 + override def uncompressedSize: Int = 0 - override def compressedSize = 0 + override def compressedSize: Int = 0 - override def compress(from: ByteBuffer, to: ByteBuffer) = { + override def compress(from: ByteBuffer, to: ByteBuffer): ByteBuffer = { // Writes compression type ID and copies raw contents to.putInt(PassThrough.typeId).put(from).rewind() to @@ -62,22 +63,23 @@ private[sql] case object PassThrough extends CompressionScheme { columnType.extract(buffer, row, ordinal) } - override def hasNext = buffer.hasRemaining + override def hasNext: Boolean = buffer.hasRemaining } } private[sql] case object RunLengthEncoding extends CompressionScheme { override val typeId = 1 - override def encoder[T <: NativeType](columnType: NativeColumnType[T]) = { + override def encoder[T <: NativeType](columnType: NativeColumnType[T]): Encoder[T] = { new this.Encoder[T](columnType) } - override def decoder[T <: NativeType](buffer: ByteBuffer, columnType: NativeColumnType[T]) = { + override def decoder[T <: NativeType]( + buffer: ByteBuffer, columnType: NativeColumnType[T]): Decoder[T] = { new this.Decoder(buffer, columnType) } - override def supports(columnType: ColumnType[_, _]) = columnType match { + override def supports(columnType: ColumnType[_, _]): Boolean = columnType match { case INT | LONG | SHORT | BYTE | STRING | BOOLEAN => true case _ => false } @@ -90,9 +92,9 @@ private[sql] case object RunLengthEncoding extends CompressionScheme { private val lastValue = new SpecificMutableRow(Seq(columnType.dataType)) private var lastRun = 0 - override def uncompressedSize = _uncompressedSize + override def uncompressedSize: Int = _uncompressedSize - override def compressedSize = _compressedSize + override def compressedSize: Int = _compressedSize override def gatherCompressibilityStats(row: Row, ordinal: Int): Unit = { val value = columnType.getField(row, ordinal) @@ -114,7 +116,7 @@ private[sql] case object RunLengthEncoding extends CompressionScheme { } } - override def compress(from: ByteBuffer, to: ByteBuffer) = { + override def compress(from: ByteBuffer, to: ByteBuffer): ByteBuffer = { to.putInt(RunLengthEncoding.typeId) if (from.hasRemaining) { @@ -169,7 +171,7 @@ private[sql] case object RunLengthEncoding extends CompressionScheme { columnType.setField(row, ordinal, currentValue) } - override def hasNext = valueCount < run || buffer.hasRemaining + override def hasNext: Boolean = valueCount < run || buffer.hasRemaining } } @@ -179,15 +181,16 @@ private[sql] case object DictionaryEncoding extends CompressionScheme { // 32K unique values allowed val MAX_DICT_SIZE = Short.MaxValue - override def decoder[T <: NativeType](buffer: ByteBuffer, columnType: NativeColumnType[T]) = { + override def decoder[T <: NativeType](buffer: ByteBuffer, columnType: NativeColumnType[T]) + : Decoder[T] = { new this.Decoder(buffer, columnType) } - override def encoder[T <: NativeType](columnType: NativeColumnType[T]) = { + override def encoder[T <: NativeType](columnType: NativeColumnType[T]): Encoder[T] = { new this.Encoder[T](columnType) } - override def supports(columnType: ColumnType[_, _]) = columnType match { + override def supports(columnType: ColumnType[_, _]): Boolean = columnType match { case INT | LONG | STRING => true case _ => false } @@ -237,7 +240,7 @@ private[sql] case object DictionaryEncoding extends CompressionScheme { } } - override def compress(from: ByteBuffer, to: ByteBuffer) = { + override def compress(from: ByteBuffer, to: ByteBuffer): ByteBuffer = { if (overflow) { throw new IllegalStateException( "Dictionary encoding should not be used because of dictionary overflow.") @@ -260,9 +263,9 @@ private[sql] case object DictionaryEncoding extends CompressionScheme { to } - override def uncompressedSize = _uncompressedSize + override def uncompressedSize: Int = _uncompressedSize - override def compressedSize = if (overflow) Int.MaxValue else dictionarySize + count * 2 + override def compressedSize: Int = if (overflow) Int.MaxValue else dictionarySize + count * 2 } class Decoder[T <: NativeType](buffer: ByteBuffer, columnType: NativeColumnType[T]) @@ -284,7 +287,7 @@ private[sql] case object DictionaryEncoding extends CompressionScheme { columnType.setField(row, ordinal, dictionary(buffer.getShort())) } - override def hasNext = buffer.hasRemaining + override def hasNext: Boolean = buffer.hasRemaining } } @@ -293,15 +296,16 @@ private[sql] case object BooleanBitSet extends CompressionScheme { val BITS_PER_LONG = 64 - override def decoder[T <: NativeType](buffer: ByteBuffer, columnType: NativeColumnType[T]) = { + override def decoder[T <: NativeType](buffer: ByteBuffer, columnType: NativeColumnType[T]) + : compression.Decoder[T] = { new this.Decoder(buffer).asInstanceOf[compression.Decoder[T]] } - override def encoder[T <: NativeType](columnType: NativeColumnType[T]) = { + override def encoder[T <: NativeType](columnType: NativeColumnType[T]): compression.Encoder[T] = { (new this.Encoder).asInstanceOf[compression.Encoder[T]] } - override def supports(columnType: ColumnType[_, _]) = columnType == BOOLEAN + override def supports(columnType: ColumnType[_, _]): Boolean = columnType == BOOLEAN class Encoder extends compression.Encoder[BooleanType.type] { private var _uncompressedSize = 0 @@ -310,7 +314,7 @@ private[sql] case object BooleanBitSet extends CompressionScheme { _uncompressedSize += BOOLEAN.defaultSize } - override def compress(from: ByteBuffer, to: ByteBuffer) = { + override def compress(from: ByteBuffer, to: ByteBuffer): ByteBuffer = { to.putInt(BooleanBitSet.typeId) // Total element count (1 byte per Boolean value) .putInt(from.remaining) @@ -347,9 +351,9 @@ private[sql] case object BooleanBitSet extends CompressionScheme { to } - override def uncompressedSize = _uncompressedSize + override def uncompressedSize: Int = _uncompressedSize - override def compressedSize = { + override def compressedSize: Int = { val extra = if (_uncompressedSize % BITS_PER_LONG == 0) 0 else 1 (_uncompressedSize / BITS_PER_LONG + extra) * 8 + 4 } @@ -380,22 +384,23 @@ private[sql] case object BooleanBitSet extends CompressionScheme { private[sql] case object IntDelta extends CompressionScheme { override def typeId: Int = 4 - override def decoder[T <: NativeType](buffer: ByteBuffer, columnType: NativeColumnType[T]) = { + override def decoder[T <: NativeType](buffer: ByteBuffer, columnType: NativeColumnType[T]) + : compression.Decoder[T] = { new Decoder(buffer, INT).asInstanceOf[compression.Decoder[T]] } - override def encoder[T <: NativeType](columnType: NativeColumnType[T]) = { + override def encoder[T <: NativeType](columnType: NativeColumnType[T]): compression.Encoder[T] = { (new Encoder).asInstanceOf[compression.Encoder[T]] } - override def supports(columnType: ColumnType[_, _]) = columnType == INT + override def supports(columnType: ColumnType[_, _]): Boolean = columnType == INT class Encoder extends compression.Encoder[IntegerType.type] { protected var _compressedSize: Int = 0 protected var _uncompressedSize: Int = 0 - override def compressedSize = _compressedSize - override def uncompressedSize = _uncompressedSize + override def compressedSize: Int = _compressedSize + override def uncompressedSize: Int = _uncompressedSize private var prevValue: Int = _ @@ -459,22 +464,23 @@ private[sql] case object IntDelta extends CompressionScheme { private[sql] case object LongDelta extends CompressionScheme { override def typeId: Int = 5 - override def decoder[T <: NativeType](buffer: ByteBuffer, columnType: NativeColumnType[T]) = { + override def decoder[T <: NativeType](buffer: ByteBuffer, columnType: NativeColumnType[T]) + : compression.Decoder[T] = { new Decoder(buffer, LONG).asInstanceOf[compression.Decoder[T]] } - override def encoder[T <: NativeType](columnType: NativeColumnType[T]) = { + override def encoder[T <: NativeType](columnType: NativeColumnType[T]): compression.Encoder[T] = { (new Encoder).asInstanceOf[compression.Encoder[T]] } - override def supports(columnType: ColumnType[_, _]) = columnType == LONG + override def supports(columnType: ColumnType[_, _]): Boolean = columnType == LONG class Encoder extends compression.Encoder[LongType.type] { protected var _compressedSize: Int = 0 protected var _uncompressedSize: Int = 0 - override def compressedSize = _compressedSize - override def uncompressedSize = _uncompressedSize + override def compressedSize: Int = _compressedSize + override def uncompressedSize: Int = _uncompressedSize private var prevValue: Long = _ http://git-wip-us.apache.org/repos/asf/spark/blob/a95043b1/sql/core/src/main/scala/org/apache/spark/sql/execution/Aggregate.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/Aggregate.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/Aggregate.scala index ad44a01..18b1ba4 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/Aggregate.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/Aggregate.scala @@ -21,6 +21,7 @@ import java.util.HashMap import org.apache.spark.annotation.DeveloperApi import org.apache.spark.SparkContext +import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.errors._ import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.plans.physical._ @@ -45,7 +46,7 @@ case class Aggregate( child: SparkPlan) extends UnaryNode { - override def requiredChildDistribution = + override def requiredChildDistribution: List[Distribution] = { if (partial) { UnspecifiedDistribution :: Nil } else { @@ -55,8 +56,9 @@ case class Aggregate( ClusteredDistribution(groupingExpressions) :: Nil } } + } - override def output = aggregateExpressions.map(_.toAttribute) + override def output: Seq[Attribute] = aggregateExpressions.map(_.toAttribute) /** * An aggregate that needs to be computed for each row in a group. @@ -119,7 +121,7 @@ case class Aggregate( } } - override def execute() = attachTree(this, "execute") { + override def execute(): RDD[Row] = attachTree(this, "execute") { if (groupingExpressions.isEmpty) { child.execute().mapPartitions { iter => val buffer = newAggregateBuffer() http://git-wip-us.apache.org/repos/asf/spark/blob/a95043b1/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala index 7c0b72a..437408d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala @@ -19,11 +19,12 @@ package org.apache.spark.sql.execution import org.apache.spark.annotation.DeveloperApi import org.apache.spark.shuffle.sort.SortShuffleManager +import org.apache.spark.sql.catalyst.expressions import org.apache.spark.{SparkEnv, HashPartitioner, RangePartitioner, SparkConf} -import org.apache.spark.rdd.ShuffledRDD +import org.apache.spark.rdd.{RDD, ShuffledRDD} import org.apache.spark.sql.{SQLContext, Row} import org.apache.spark.sql.catalyst.errors.attachTree -import org.apache.spark.sql.catalyst.expressions.RowOrdering +import org.apache.spark.sql.catalyst.expressions.{Attribute, RowOrdering} import org.apache.spark.sql.catalyst.plans.physical._ import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.util.MutablePair @@ -34,9 +35,9 @@ import org.apache.spark.util.MutablePair @DeveloperApi case class Exchange(newPartitioning: Partitioning, child: SparkPlan) extends UnaryNode { - override def outputPartitioning = newPartitioning + override def outputPartitioning: Partitioning = newPartitioning - override def output = child.output + override def output: Seq[Attribute] = child.output /** We must copy rows when sort based shuffle is on */ protected def sortBasedShuffleOn = SparkEnv.get.shuffleManager.isInstanceOf[SortShuffleManager] @@ -44,7 +45,7 @@ case class Exchange(newPartitioning: Partitioning, child: SparkPlan) extends Una private val bypassMergeThreshold = child.sqlContext.sparkContext.conf.getInt("spark.shuffle.sort.bypassMergeThreshold", 200) - override def execute() = attachTree(this , "execute") { + override def execute(): RDD[Row] = attachTree(this , "execute") { newPartitioning match { case HashPartitioning(expressions, numPartitions) => // TODO: Eliminate redundant expressions in grouping key and value. @@ -123,13 +124,13 @@ case class Exchange(newPartitioning: Partitioning, child: SparkPlan) extends Una */ private[sql] case class AddExchange(sqlContext: SQLContext) extends Rule[SparkPlan] { // TODO: Determine the number of partitions. - def numPartitions = sqlContext.conf.numShufflePartitions + def numPartitions: Int = sqlContext.conf.numShufflePartitions def apply(plan: SparkPlan): SparkPlan = plan.transformUp { case operator: SparkPlan => // Check if every child's outputPartitioning satisfies the corresponding // required data distribution. - def meetsRequirements = + def meetsRequirements: Boolean = !operator.requiredChildDistribution.zip(operator.children).map { case (required, child) => val valid = child.outputPartitioning.satisfies(required) @@ -147,7 +148,7 @@ private[sql] case class AddExchange(sqlContext: SQLContext) extends Rule[SparkPl // datasets are both clustered by "a", but these two outputPartitionings are not // compatible. // TODO: ASSUMES TRANSITIVITY? - def compatible = + def compatible: Boolean = !operator.children .map(_.outputPartitioning) .sliding(2) @@ -158,7 +159,7 @@ private[sql] case class AddExchange(sqlContext: SQLContext) extends Rule[SparkPl // Check if the partitioning we want to ensure is the same as the child's output // partitioning. If so, we do not need to add the Exchange operator. - def addExchangeIfNecessary(partitioning: Partitioning, child: SparkPlan) = + def addExchangeIfNecessary(partitioning: Partitioning, child: SparkPlan): SparkPlan = if (child.outputPartitioning != partitioning) Exchange(partitioning, child) else child if (meetsRequirements && compatible) { http://git-wip-us.apache.org/repos/asf/spark/blob/a95043b1/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala index 248dc15..d895572 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala @@ -26,6 +26,8 @@ import org.apache.spark.sql.catalyst.expressions.{Attribute, GenericMutableRow} import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Statistics} import org.apache.spark.sql.types.StructType +import scala.collection.immutable + /** * :: DeveloperApi :: */ @@ -58,17 +60,17 @@ object RDDConversions { case class LogicalRDD(output: Seq[Attribute], rdd: RDD[Row])(sqlContext: SQLContext) extends LogicalPlan with MultiInstanceRelation { - override def children = Nil + override def children: Seq[LogicalPlan] = Nil - override def newInstance() = + override def newInstance(): LogicalRDD.this.type = LogicalRDD(output.map(_.newInstance()), rdd)(sqlContext).asInstanceOf[this.type] - override def sameResult(plan: LogicalPlan) = plan match { + override def sameResult(plan: LogicalPlan): Boolean = plan match { case LogicalRDD(_, otherRDD) => rdd.id == otherRDD.id case _ => false } - @transient override lazy val statistics = Statistics( + @transient override lazy val statistics: Statistics = Statistics( // TODO: Instead of returning a default value here, find a way to return a meaningful size // estimate for RDDs. See PR 1238 for more discussions. sizeInBytes = BigInt(sqlContext.conf.defaultSizeInBytes) @@ -77,24 +79,24 @@ case class LogicalRDD(output: Seq[Attribute], rdd: RDD[Row])(sqlContext: SQLCont /** Physical plan node for scanning data from an RDD. */ case class PhysicalRDD(output: Seq[Attribute], rdd: RDD[Row]) extends LeafNode { - override def execute() = rdd + override def execute(): RDD[Row] = rdd } /** Logical plan node for scanning data from a local collection. */ case class LogicalLocalTable(output: Seq[Attribute], rows: Seq[Row])(sqlContext: SQLContext) extends LogicalPlan with MultiInstanceRelation { - override def children = Nil + override def children: Seq[LogicalPlan] = Nil - override def newInstance() = + override def newInstance(): this.type = LogicalLocalTable(output.map(_.newInstance()), rows)(sqlContext).asInstanceOf[this.type] - override def sameResult(plan: LogicalPlan) = plan match { + override def sameResult(plan: LogicalPlan): Boolean = plan match { case LogicalRDD(_, otherRDD) => rows == rows case _ => false } - @transient override lazy val statistics = Statistics( + @transient override lazy val statistics: Statistics = Statistics( // TODO: Improve the statistics estimation. // This is made small enough so it can be broadcasted. sizeInBytes = sqlContext.conf.autoBroadcastJoinThreshold - 1 http://git-wip-us.apache.org/repos/asf/spark/blob/a95043b1/sql/core/src/main/scala/org/apache/spark/sql/execution/Expand.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/Expand.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/Expand.scala index 9517242..5758494 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/Expand.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/Expand.scala @@ -18,6 +18,7 @@ package org.apache.spark.sql.execution import org.apache.spark.annotation.DeveloperApi +import org.apache.spark.rdd.RDD import org.apache.spark.sql.SQLContext import org.apache.spark.sql.catalyst.errors._ import org.apache.spark.sql.catalyst.expressions._ @@ -42,7 +43,7 @@ case class Expand( // as UNKNOWN partitioning override def outputPartitioning: Partitioning = UnknownPartitioning(0) - override def execute() = attachTree(this, "execute") { + override def execute(): RDD[Row] = attachTree(this, "execute") { child.execute().mapPartitions { iter => // TODO Move out projection objects creation and transfer to // workers via closure. However we can't assume the Projection @@ -55,7 +56,7 @@ case class Expand( private[this] var idx = -1 // -1 means the initial state private[this] var input: Row = _ - override final def hasNext = (-1 < idx && idx < groups.length) || iter.hasNext + override final def hasNext: Boolean = (-1 < idx && idx < groups.length) || iter.hasNext override final def next(): Row = { if (idx <= 0) { http://git-wip-us.apache.org/repos/asf/spark/blob/a95043b1/sql/core/src/main/scala/org/apache/spark/sql/execution/Generate.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/Generate.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/Generate.scala index 38877c2..1227104 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/Generate.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/Generate.scala @@ -18,6 +18,7 @@ package org.apache.spark.sql.execution import org.apache.spark.annotation.DeveloperApi +import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.expressions._ /** @@ -54,7 +55,7 @@ case class Generate( val boundGenerator = BindReferences.bindReference(generator, child.output) - override def execute() = { + override def execute(): RDD[Row] = { if (join) { child.execute().mapPartitions { iter => val nullValues = Seq.fill(generator.output.size)(Literal(null)) http://git-wip-us.apache.org/repos/asf/spark/blob/a95043b1/sql/core/src/main/scala/org/apache/spark/sql/execution/GeneratedAggregate.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/GeneratedAggregate.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/GeneratedAggregate.scala index 4abe26f..89682d2 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/GeneratedAggregate.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/GeneratedAggregate.scala @@ -18,6 +18,7 @@ package org.apache.spark.sql.execution import org.apache.spark.annotation.DeveloperApi +import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.trees._ import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.plans.physical._ @@ -49,7 +50,7 @@ case class GeneratedAggregate( child: SparkPlan) extends UnaryNode { - override def requiredChildDistribution = + override def requiredChildDistribution: Seq[Distribution] = if (partial) { UnspecifiedDistribution :: Nil } else { @@ -60,9 +61,9 @@ case class GeneratedAggregate( } } - override def output = aggregateExpressions.map(_.toAttribute) + override def output: Seq[Attribute] = aggregateExpressions.map(_.toAttribute) - override def execute() = { + override def execute(): RDD[Row] = { val aggregatesToCompute = aggregateExpressions.flatMap { a => a.collect { case agg: AggregateExpression => agg} } @@ -271,9 +272,9 @@ case class GeneratedAggregate( private[this] val resultIterator = buffers.entrySet.iterator() private[this] val resultProjection = resultProjectionBuilder() - def hasNext = resultIterator.hasNext + def hasNext: Boolean = resultIterator.hasNext - def next() = { + def next(): Row = { val currentGroup = resultIterator.next() resultProjection(joinedRow(currentGroup.getKey, currentGroup.getValue)) } http://git-wip-us.apache.org/repos/asf/spark/blob/a95043b1/sql/core/src/main/scala/org/apache/spark/sql/execution/LocalTableScan.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/LocalTableScan.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/LocalTableScan.scala index d3a18b3..5bd699a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/LocalTableScan.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/LocalTableScan.scala @@ -17,6 +17,7 @@ package org.apache.spark.sql.execution +import org.apache.spark.rdd.RDD import org.apache.spark.sql.Row import org.apache.spark.sql.catalyst.ScalaReflection import org.apache.spark.sql.catalyst.expressions.Attribute @@ -29,11 +30,11 @@ case class LocalTableScan(output: Seq[Attribute], rows: Seq[Row]) extends LeafNo private lazy val rdd = sqlContext.sparkContext.parallelize(rows) - override def execute() = rdd + override def execute(): RDD[Row] = rdd - override def executeCollect() = + override def executeCollect(): Array[Row] = rows.map(ScalaReflection.convertRowToScala(_, schema)).toArray - override def executeTake(limit: Int) = + override def executeTake(limit: Int): Array[Row] = rows.map(ScalaReflection.convertRowToScala(_, schema)).take(limit).toArray } http://git-wip-us.apache.org/repos/asf/spark/blob/a95043b1/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala index 052766c..d239637 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala @@ -67,6 +67,7 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ // TODO: Move to `DistributedPlan` /** Specifies how data is partitioned across different nodes in the cluster. */ def outputPartitioning: Partitioning = UnknownPartitioning(0) // TODO: WRONG WIDTH! + /** Specifies any partition requirements on the input data for this operator. */ def requiredChildDistribution: Seq[Distribution] = Seq.fill(children.size)(UnspecifiedDistribution) http://git-wip-us.apache.org/repos/asf/spark/blob/a95043b1/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlSerializer.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlSerializer.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlSerializer.scala index 30564e1..c4534fd 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlSerializer.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlSerializer.scala @@ -74,7 +74,7 @@ private[execution] class KryoResourcePool(size: Int) new KryoSerializer(sparkConf) } - def newInstance() = ser.newInstance() + def newInstance(): SerializerInstance = ser.newInstance() } private[sql] object SparkSqlSerializer { http://git-wip-us.apache.org/repos/asf/spark/blob/a95043b1/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala index 5281c75..2b58115 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala @@ -154,7 +154,7 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] { case _ => Nil } - def canBeCodeGened(aggs: Seq[AggregateExpression]) = !aggs.exists { + def canBeCodeGened(aggs: Seq[AggregateExpression]): Boolean = !aggs.exists { case _: Sum | _: Count | _: Max | _: CombineSetsAndCount => false // The generated set implementation is pretty limited ATM. case CollectHashSet(exprs) if exprs.size == 1 && @@ -162,7 +162,7 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] { case _ => true } - def allAggregates(exprs: Seq[Expression]) = + def allAggregates(exprs: Seq[Expression]): Seq[AggregateExpression] = exprs.flatMap(_.collect { case a: AggregateExpression => a }) } @@ -257,7 +257,7 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] { // Can we automate these 'pass through' operations? object BasicOperators extends Strategy { - def numPartitions = self.numPartitions + def numPartitions: Int = self.numPartitions def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match { case r: RunnableCommand => ExecutedCommand(r) :: Nil http://git-wip-us.apache.org/repos/asf/spark/blob/a95043b1/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala index 7102685..20c9bc3 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala @@ -24,7 +24,7 @@ import org.apache.spark.shuffle.sort.SortShuffleManager import org.apache.spark.sql.catalyst.ScalaReflection import org.apache.spark.sql.catalyst.errors._ import org.apache.spark.sql.catalyst.expressions._ -import org.apache.spark.sql.catalyst.plans.physical.{ClusteredDistribution, OrderedDistribution, SinglePartition, UnspecifiedDistribution} +import org.apache.spark.sql.catalyst.plans.physical._ import org.apache.spark.util.MutablePair import org.apache.spark.util.collection.ExternalSorter @@ -33,11 +33,11 @@ import org.apache.spark.util.collection.ExternalSorter */ @DeveloperApi case class Project(projectList: Seq[NamedExpression], child: SparkPlan) extends UnaryNode { - override def output = projectList.map(_.toAttribute) + override def output: Seq[Attribute] = projectList.map(_.toAttribute) @transient lazy val buildProjection = newMutableProjection(projectList, child.output) - override def execute() = child.execute().mapPartitions { iter => + override def execute(): RDD[Row] = child.execute().mapPartitions { iter => val resuableProjection = buildProjection() iter.map(resuableProjection) } @@ -48,11 +48,11 @@ case class Project(projectList: Seq[NamedExpression], child: SparkPlan) extends */ @DeveloperApi case class Filter(condition: Expression, child: SparkPlan) extends UnaryNode { - override def output = child.output + override def output: Seq[Attribute] = child.output - @transient lazy val conditionEvaluator = newPredicate(condition, child.output) + @transient lazy val conditionEvaluator: (Row) => Boolean = newPredicate(condition, child.output) - override def execute() = child.execute().mapPartitions { iter => + override def execute(): RDD[Row] = child.execute().mapPartitions { iter => iter.filter(conditionEvaluator) } } @@ -64,10 +64,12 @@ case class Filter(condition: Expression, child: SparkPlan) extends UnaryNode { case class Sample(fraction: Double, withReplacement: Boolean, seed: Long, child: SparkPlan) extends UnaryNode { - override def output = child.output + override def output: Seq[Attribute] = child.output // TODO: How to pick seed? - override def execute() = child.execute().map(_.copy()).sample(withReplacement, fraction, seed) + override def execute(): RDD[Row] = { + child.execute().map(_.copy()).sample(withReplacement, fraction, seed) + } } /** @@ -76,8 +78,8 @@ case class Sample(fraction: Double, withReplacement: Boolean, seed: Long, child: @DeveloperApi case class Union(children: Seq[SparkPlan]) extends SparkPlan { // TODO: attributes output by union should be distinct for nullability purposes - override def output = children.head.output - override def execute() = sparkContext.union(children.map(_.execute())) + override def output: Seq[Attribute] = children.head.output + override def execute(): RDD[Row] = sparkContext.union(children.map(_.execute())) } /** @@ -97,12 +99,12 @@ case class Limit(limit: Int, child: SparkPlan) /** We must copy rows when sort based shuffle is on */ private def sortBasedShuffleOn = SparkEnv.get.shuffleManager.isInstanceOf[SortShuffleManager] - override def output = child.output - override def outputPartitioning = SinglePartition + override def output: Seq[Attribute] = child.output + override def outputPartitioning: Partitioning = SinglePartition override def executeCollect(): Array[Row] = child.executeTake(limit) - override def execute() = { + override def execute(): RDD[Row] = { val rdd: RDD[_ <: Product2[Boolean, Row]] = if (sortBasedShuffleOn) { child.execute().mapPartitions { iter => iter.take(limit).map(row => (false, row.copy())) @@ -129,20 +131,21 @@ case class Limit(limit: Int, child: SparkPlan) @DeveloperApi case class TakeOrdered(limit: Int, sortOrder: Seq[SortOrder], child: SparkPlan) extends UnaryNode { - override def output = child.output - override def outputPartitioning = SinglePartition + override def output: Seq[Attribute] = child.output + + override def outputPartitioning: Partitioning = SinglePartition - val ord = new RowOrdering(sortOrder, child.output) + private val ord: RowOrdering = new RowOrdering(sortOrder, child.output) - private def collectData() = child.execute().map(_.copy()).takeOrdered(limit)(ord) + private def collectData(): Array[Row] = child.execute().map(_.copy()).takeOrdered(limit)(ord) // TODO: Is this copying for no reason? - override def executeCollect() = + override def executeCollect(): Array[Row] = collectData().map(ScalaReflection.convertRowToScala(_, this.schema)) // TODO: Terminal split should be implemented differently from non-terminal split. // TODO: Pick num splits based on |limit|. - override def execute() = sparkContext.makeRDD(collectData(), 1) + override def execute(): RDD[Row] = sparkContext.makeRDD(collectData(), 1) } /** @@ -157,17 +160,17 @@ case class Sort( global: Boolean, child: SparkPlan) extends UnaryNode { - override def requiredChildDistribution = + override def requiredChildDistribution: Seq[Distribution] = if (global) OrderedDistribution(sortOrder) :: Nil else UnspecifiedDistribution :: Nil - override def execute() = attachTree(this, "sort") { + override def execute(): RDD[Row] = attachTree(this, "sort") { child.execute().mapPartitions( { iterator => val ordering = newOrdering(sortOrder, child.output) iterator.map(_.copy()).toArray.sorted(ordering).iterator }, preservesPartitioning = true) } - override def output = child.output + override def output: Seq[Attribute] = child.output } /** @@ -182,10 +185,11 @@ case class ExternalSort( global: Boolean, child: SparkPlan) extends UnaryNode { - override def requiredChildDistribution = + + override def requiredChildDistribution: Seq[Distribution] = if (global) OrderedDistribution(sortOrder) :: Nil else UnspecifiedDistribution :: Nil - override def execute() = attachTree(this, "sort") { + override def execute(): RDD[Row] = attachTree(this, "sort") { child.execute().mapPartitions( { iterator => val ordering = newOrdering(sortOrder, child.output) val sorter = new ExternalSorter[Row, Null, Row](ordering = Some(ordering)) @@ -194,7 +198,7 @@ case class ExternalSort( }, preservesPartitioning = true) } - override def output = child.output + override def output: Seq[Attribute] = child.output } /** @@ -206,12 +210,12 @@ case class ExternalSort( */ @DeveloperApi case class Distinct(partial: Boolean, child: SparkPlan) extends UnaryNode { - override def output = child.output + override def output: Seq[Attribute] = child.output - override def requiredChildDistribution = + override def requiredChildDistribution: Seq[Distribution] = if (partial) UnspecifiedDistribution :: Nil else ClusteredDistribution(child.output) :: Nil - override def execute() = { + override def execute(): RDD[Row] = { child.execute().mapPartitions { iter => val hashSet = new scala.collection.mutable.HashSet[Row]() @@ -236,9 +240,9 @@ case class Distinct(partial: Boolean, child: SparkPlan) extends UnaryNode { */ @DeveloperApi case class Except(left: SparkPlan, right: SparkPlan) extends BinaryNode { - override def output = left.output + override def output: Seq[Attribute] = left.output - override def execute() = { + override def execute(): RDD[Row] = { left.execute().map(_.copy()).subtract(right.execute().map(_.copy())) } } @@ -250,9 +254,9 @@ case class Except(left: SparkPlan, right: SparkPlan) extends BinaryNode { */ @DeveloperApi case class Intersect(left: SparkPlan, right: SparkPlan) extends BinaryNode { - override def output = children.head.output + override def output: Seq[Attribute] = children.head.output - override def execute() = { + override def execute(): RDD[Row] = { left.execute().map(_.copy()).intersection(right.execute().map(_.copy())) } } @@ -265,6 +269,7 @@ case class Intersect(left: SparkPlan, right: SparkPlan) extends BinaryNode { */ @DeveloperApi case class OutputFaker(output: Seq[Attribute], child: SparkPlan) extends SparkPlan { - def children = child :: Nil - def execute() = child.execute() + def children: Seq[SparkPlan] = child :: Nil + + def execute(): RDD[Row] = child.execute() } http://git-wip-us.apache.org/repos/asf/spark/blob/a95043b1/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala index a112321..fad7a28 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala @@ -26,7 +26,6 @@ import org.apache.spark.sql.catalyst.errors.TreeNodeException import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Row, Attribute} import org.apache.spark.sql.catalyst.plans.logical import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan -import scala.collection.mutable.ArrayBuffer /** * A logical command that is executed for its side-effects. `RunnableCommand`s are @@ -54,9 +53,9 @@ case class ExecutedCommand(cmd: RunnableCommand) extends SparkPlan { */ protected[sql] lazy val sideEffectResult: Seq[Row] = cmd.run(sqlContext) - override def output = cmd.output + override def output: Seq[Attribute] = cmd.output - override def children = Nil + override def children: Seq[SparkPlan] = Nil override def executeCollect(): Array[Row] = sideEffectResult.toArray @@ -71,9 +70,10 @@ case class ExecutedCommand(cmd: RunnableCommand) extends SparkPlan { @DeveloperApi case class SetCommand( kv: Option[(String, Option[String])], - override val output: Seq[Attribute]) extends RunnableCommand with Logging { + override val output: Seq[Attribute]) + extends RunnableCommand with Logging { - override def run(sqlContext: SQLContext) = kv match { + override def run(sqlContext: SQLContext): Seq[Row] = kv match { // Configures the deprecated "mapred.reduce.tasks" property. case Some((SQLConf.Deprecated.MAPRED_REDUCE_TASKS, Some(value))) => logWarning( @@ -119,10 +119,11 @@ case class ExplainCommand( logicalPlan: LogicalPlan, override val output: Seq[Attribute] = Seq(AttributeReference("plan", StringType, nullable = false)()), - extended: Boolean = false) extends RunnableCommand { + extended: Boolean = false) + extends RunnableCommand { // Run through the optimizer to generate the physical plan. - override def run(sqlContext: SQLContext) = try { + override def run(sqlContext: SQLContext): Seq[Row] = try { // TODO in Hive, the "extended" ExplainCommand prints the AST as well, and detailed properties. val queryExecution = sqlContext.executePlan(logicalPlan) val outputString = if (extended) queryExecution.toString else queryExecution.simpleString @@ -140,9 +141,10 @@ case class ExplainCommand( case class CacheTableCommand( tableName: String, plan: Option[LogicalPlan], - isLazy: Boolean) extends RunnableCommand { + isLazy: Boolean) + extends RunnableCommand { - override def run(sqlContext: SQLContext) = { + override def run(sqlContext: SQLContext): Seq[Row] = { plan.foreach { logicalPlan => sqlContext.registerDataFrameAsTable(DataFrame(sqlContext, logicalPlan), tableName) } @@ -166,7 +168,7 @@ case class CacheTableCommand( @DeveloperApi case class UncacheTableCommand(tableName: String) extends RunnableCommand { - override def run(sqlContext: SQLContext) = { + override def run(sqlContext: SQLContext): Seq[Row] = { sqlContext.table(tableName).unpersist(blocking = false) Seq.empty[Row] } @@ -181,7 +183,7 @@ case class UncacheTableCommand(tableName: String) extends RunnableCommand { @DeveloperApi case object ClearCacheCommand extends RunnableCommand { - override def run(sqlContext: SQLContext) = { + override def run(sqlContext: SQLContext): Seq[Row] = { sqlContext.clearCache() Seq.empty[Row] } @@ -196,9 +198,10 @@ case object ClearCacheCommand extends RunnableCommand { case class DescribeCommand( child: SparkPlan, override val output: Seq[Attribute], - isExtended: Boolean) extends RunnableCommand { + isExtended: Boolean) + extends RunnableCommand { - override def run(sqlContext: SQLContext) = { + override def run(sqlContext: SQLContext): Seq[Row] = { child.schema.fields.map { field => val cmtKey = "comment" val comment = if (field.metadata.contains(cmtKey)) field.metadata.getString(cmtKey) else "" @@ -220,7 +223,7 @@ case class DescribeCommand( case class ShowTablesCommand(databaseName: Option[String]) extends RunnableCommand { // The result of SHOW TABLES has two columns, tableName and isTemporary. - override val output = { + override val output: Seq[Attribute] = { val schema = StructType( StructField("tableName", StringType, false) :: StructField("isTemporary", BooleanType, false) :: Nil) @@ -228,7 +231,7 @@ case class ShowTablesCommand(databaseName: Option[String]) extends RunnableComma schema.toAttributes } - override def run(sqlContext: SQLContext) = { + override def run(sqlContext: SQLContext): Seq[Row] = { // Since we need to return a Seq of rows, we will call getTables directly // instead of calling tables in sqlContext. val rows = sqlContext.catalog.getTables(databaseName).map { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org For additional commands, e-mail: commits-help@spark.apache.org