Return-Path: X-Original-To: apmail-spark-commits-archive@minotaur.apache.org Delivered-To: apmail-spark-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 6F98E18677 for ; Tue, 6 Oct 2015 15:45:41 +0000 (UTC) Received: (qmail 64963 invoked by uid 500); 6 Oct 2015 15:45:38 -0000 Delivered-To: apmail-spark-commits-archive@spark.apache.org Received: (qmail 64938 invoked by uid 500); 6 Oct 2015 15:45:38 -0000 Mailing-List: contact commits-help@spark.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Delivered-To: mailing list commits@spark.apache.org Received: (qmail 64929 invoked by uid 99); 6 Oct 2015 15:45:38 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 06 Oct 2015 15:45:38 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id BF877DFE14; Tue, 6 Oct 2015 15:45:37 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: davies@apache.org To: commits@spark.apache.org Message-Id: X-Mailer: ASF-Git Admin Mailer Subject: spark git commit: [SPARK-10938] [SQL] remove typeId in columnar cache Date: Tue, 6 Oct 2015 15:45:37 +0000 (UTC) Repository: spark Updated Branches: refs/heads/master 4e0027fea -> 27ecfe61f [SPARK-10938] [SQL] remove typeId in columnar cache This PR remove the typeId in columnar cache, it's not needed anymore, it also remove DATE and TIMESTAMP (use INT/LONG instead). Author: Davies Liu Closes #8989 from davies/refactor_cache. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/27ecfe61 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/27ecfe61 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/27ecfe61 Branch: refs/heads/master Commit: 27ecfe61f07c8413a7b8b9fbdf36ed99cf05227d Parents: 4e0027f Author: Davies Liu Authored: Tue Oct 6 08:45:31 2015 -0700 Committer: Davies Liu Committed: Tue Oct 6 08:45:31 2015 -0700 ---------------------------------------------------------------------- project/MimaExcludes.scala | 4 +- .../spark/sql/columnar/ColumnAccessor.scala | 36 ++++------ .../spark/sql/columnar/ColumnBuilder.scala | 16 ++--- .../apache/spark/sql/columnar/ColumnStats.scala | 4 -- .../apache/spark/sql/columnar/ColumnType.scala | 71 ++++---------------- .../sql/columnar/NullableColumnBuilder.scala | 19 +++--- .../compression/CompressibleColumnBuilder.scala | 27 ++++---- .../compression/CompressionScheme.scala | 6 +- .../spark/sql/columnar/ColumnStatsSuite.scala | 3 - .../spark/sql/columnar/ColumnTypeSuite.scala | 10 +-- .../spark/sql/columnar/ColumnarTestUtils.scala | 5 +- .../columnar/NullableColumnAccessorSuite.scala | 8 +-- .../columnar/NullableColumnBuilderSuite.scala | 5 +- 13 files changed, 63 insertions(+), 151 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/27ecfe61/project/MimaExcludes.scala ---------------------------------------------------------------------- diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala index b2e6be7..2d4d146 100644 --- a/project/MimaExcludes.scala +++ b/project/MimaExcludes.scala @@ -42,7 +42,9 @@ object MimaExcludes { excludePackage("org.spark-project.jetty"), MimaBuild.excludeSparkPackage("unused"), // SQL execution is considered private. - excludePackage("org.apache.spark.sql.execution") + excludePackage("org.apache.spark.sql.execution"), + // SQL columnar is considered private. + excludePackage("org.apache.spark.sql.columnar") ) ++ MimaBuild.excludeSparkClass("streaming.flume.FlumeTestUtils") ++ MimaBuild.excludeSparkClass("streaming.flume.PollingFlumeTestUtils") ++ http://git-wip-us.apache.org/repos/asf/spark/blob/27ecfe61/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnAccessor.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnAccessor.scala b/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnAccessor.scala index 4c29a09..2b1d700 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnAccessor.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnAccessor.scala @@ -103,35 +103,23 @@ private[sql] class GenericColumnAccessor(buffer: ByteBuffer, dataType: DataType) extends BasicColumnAccessor[Array[Byte]](buffer, GENERIC(dataType)) with NullableColumnAccessor -private[sql] class DateColumnAccessor(buffer: ByteBuffer) - extends NativeColumnAccessor(buffer, DATE) - -private[sql] class TimestampColumnAccessor(buffer: ByteBuffer) - extends NativeColumnAccessor(buffer, TIMESTAMP) - private[sql] object ColumnAccessor { def apply(dataType: DataType, buffer: ByteBuffer): ColumnAccessor = { - val dup = buffer.duplicate().order(ByteOrder.nativeOrder) - - // The first 4 bytes in the buffer indicate the column type. This field is not used now, - // because we always know the data type of the column ahead of time. - dup.getInt() + val buf = buffer.order(ByteOrder.nativeOrder) dataType match { - case BooleanType => new BooleanColumnAccessor(dup) - case ByteType => new ByteColumnAccessor(dup) - case ShortType => new ShortColumnAccessor(dup) - case IntegerType => new IntColumnAccessor(dup) - case DateType => new DateColumnAccessor(dup) - case LongType => new LongColumnAccessor(dup) - case TimestampType => new TimestampColumnAccessor(dup) - case FloatType => new FloatColumnAccessor(dup) - case DoubleType => new DoubleColumnAccessor(dup) - case StringType => new StringColumnAccessor(dup) - case BinaryType => new BinaryColumnAccessor(dup) + case BooleanType => new BooleanColumnAccessor(buf) + case ByteType => new ByteColumnAccessor(buf) + case ShortType => new ShortColumnAccessor(buf) + case IntegerType | DateType => new IntColumnAccessor(buf) + case LongType | TimestampType => new LongColumnAccessor(buf) + case FloatType => new FloatColumnAccessor(buf) + case DoubleType => new DoubleColumnAccessor(buf) + case StringType => new StringColumnAccessor(buf) + case BinaryType => new BinaryColumnAccessor(buf) case DecimalType.Fixed(precision, scale) if precision < 19 => - new FixedDecimalColumnAccessor(dup, precision, scale) - case other => new GenericColumnAccessor(dup, other) + new FixedDecimalColumnAccessor(buf, precision, scale) + case other => new GenericColumnAccessor(buf, other) } } } http://git-wip-us.apache.org/repos/asf/spark/blob/27ecfe61/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnBuilder.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnBuilder.scala b/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnBuilder.scala index 1620fc4..2e60564 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnBuilder.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnBuilder.scala @@ -63,9 +63,8 @@ private[sql] class BasicColumnBuilder[JvmType]( val size = if (initialSize == 0) DEFAULT_INITIAL_BUFFER_SIZE else initialSize this.columnName = columnName - // Reserves 4 bytes for column type ID - buffer = ByteBuffer.allocate(4 + size * columnType.defaultSize) - buffer.order(ByteOrder.nativeOrder()).putInt(columnType.typeId) + buffer = ByteBuffer.allocate(size * columnType.defaultSize) + buffer.order(ByteOrder.nativeOrder()) } override def appendFrom(row: InternalRow, ordinal: Int): Unit = { @@ -121,11 +120,6 @@ private[sql] class FixedDecimalColumnBuilder( private[sql] class GenericColumnBuilder(dataType: DataType) extends ComplexColumnBuilder(new GenericColumnStats(dataType), GENERIC(dataType)) -private[sql] class DateColumnBuilder extends NativeColumnBuilder(new DateColumnStats, DATE) - -private[sql] class TimestampColumnBuilder - extends NativeColumnBuilder(new TimestampColumnStats, TIMESTAMP) - private[sql] object ColumnBuilder { val DEFAULT_INITIAL_BUFFER_SIZE = 1024 * 1024 @@ -154,10 +148,8 @@ private[sql] object ColumnBuilder { case BooleanType => new BooleanColumnBuilder case ByteType => new ByteColumnBuilder case ShortType => new ShortColumnBuilder - case IntegerType => new IntColumnBuilder - case DateType => new DateColumnBuilder - case LongType => new LongColumnBuilder - case TimestampType => new TimestampColumnBuilder + case IntegerType | DateType => new IntColumnBuilder + case LongType | TimestampType => new LongColumnBuilder case FloatType => new FloatColumnBuilder case DoubleType => new DoubleColumnBuilder case StringType => new StringColumnBuilder http://git-wip-us.apache.org/repos/asf/spark/blob/27ecfe61/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnStats.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnStats.scala b/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnStats.scala index fbd51b7..3b5052b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnStats.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnStats.scala @@ -266,7 +266,3 @@ private[sql] class GenericColumnStats(dataType: DataType) extends ColumnStats { override def collectedStatistics: GenericInternalRow = new GenericInternalRow(Array[Any](null, null, nullCount, count, sizeInBytes)) } - -private[sql] class DateColumnStats extends IntColumnStats - -private[sql] class TimestampColumnStats extends LongColumnStats http://git-wip-us.apache.org/repos/asf/spark/blob/27ecfe61/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnType.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnType.scala b/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnType.scala index ab482a3..3a0cea8 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnType.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnType.scala @@ -38,9 +38,6 @@ private[sql] sealed abstract class ColumnType[JvmType] { // The catalyst data type of this column. def dataType: DataType - // A unique ID representing the type. - def typeId: Int - // Default size in bytes for one element of type T (e.g. 4 for `Int`). def defaultSize: Int @@ -107,7 +104,6 @@ private[sql] sealed abstract class ColumnType[JvmType] { private[sql] abstract class NativeColumnType[T <: AtomicType]( val dataType: T, - val typeId: Int, val defaultSize: Int) extends ColumnType[T#InternalType] { @@ -117,7 +113,7 @@ private[sql] abstract class NativeColumnType[T <: AtomicType]( def scalaTag: TypeTag[dataType.InternalType] = dataType.tag } -private[sql] object INT extends NativeColumnType(IntegerType, 0, 4) { +private[sql] object INT extends NativeColumnType(IntegerType, 4) { override def append(v: Int, buffer: ByteBuffer): Unit = { buffer.putInt(v) } @@ -145,7 +141,7 @@ private[sql] object INT extends NativeColumnType(IntegerType, 0, 4) { } } -private[sql] object LONG extends NativeColumnType(LongType, 1, 8) { +private[sql] object LONG extends NativeColumnType(LongType, 8) { override def append(v: Long, buffer: ByteBuffer): Unit = { buffer.putLong(v) } @@ -173,7 +169,7 @@ private[sql] object LONG extends NativeColumnType(LongType, 1, 8) { } } -private[sql] object FLOAT extends NativeColumnType(FloatType, 2, 4) { +private[sql] object FLOAT extends NativeColumnType(FloatType, 4) { override def append(v: Float, buffer: ByteBuffer): Unit = { buffer.putFloat(v) } @@ -201,7 +197,7 @@ private[sql] object FLOAT extends NativeColumnType(FloatType, 2, 4) { } } -private[sql] object DOUBLE extends NativeColumnType(DoubleType, 3, 8) { +private[sql] object DOUBLE extends NativeColumnType(DoubleType, 8) { override def append(v: Double, buffer: ByteBuffer): Unit = { buffer.putDouble(v) } @@ -229,7 +225,7 @@ private[sql] object DOUBLE extends NativeColumnType(DoubleType, 3, 8) { } } -private[sql] object BOOLEAN extends NativeColumnType(BooleanType, 4, 1) { +private[sql] object BOOLEAN extends NativeColumnType(BooleanType, 1) { override def append(v: Boolean, buffer: ByteBuffer): Unit = { buffer.put(if (v) 1: Byte else 0: Byte) } @@ -255,7 +251,7 @@ private[sql] object BOOLEAN extends NativeColumnType(BooleanType, 4, 1) { } } -private[sql] object BYTE extends NativeColumnType(ByteType, 5, 1) { +private[sql] object BYTE extends NativeColumnType(ByteType, 1) { override def append(v: Byte, buffer: ByteBuffer): Unit = { buffer.put(v) } @@ -283,7 +279,7 @@ private[sql] object BYTE extends NativeColumnType(ByteType, 5, 1) { } } -private[sql] object SHORT extends NativeColumnType(ShortType, 6, 2) { +private[sql] object SHORT extends NativeColumnType(ShortType, 2) { override def append(v: Short, buffer: ByteBuffer): Unit = { buffer.putShort(v) } @@ -311,7 +307,7 @@ private[sql] object SHORT extends NativeColumnType(ShortType, 6, 2) { } } -private[sql] object STRING extends NativeColumnType(StringType, 7, 8) { +private[sql] object STRING extends NativeColumnType(StringType, 8) { override def actualSize(row: InternalRow, ordinal: Int): Int = { row.getUTF8String(ordinal).numBytes() + 4 } @@ -343,46 +339,9 @@ private[sql] object STRING extends NativeColumnType(StringType, 7, 8) { override def clone(v: UTF8String): UTF8String = v.clone() } -private[sql] object DATE extends NativeColumnType(DateType, 8, 4) { - override def extract(buffer: ByteBuffer): Int = { - buffer.getInt - } - - override def append(v: Int, buffer: ByteBuffer): Unit = { - buffer.putInt(v) - } - - override def getField(row: InternalRow, ordinal: Int): Int = { - row.getInt(ordinal) - } - - def setField(row: MutableRow, ordinal: Int, value: Int): Unit = { - row(ordinal) = value - } -} - -private[sql] object TIMESTAMP extends NativeColumnType(TimestampType, 9, 8) { - override def extract(buffer: ByteBuffer): Long = { - buffer.getLong - } - - override def append(v: Long, buffer: ByteBuffer): Unit = { - buffer.putLong(v) - } - - override def getField(row: InternalRow, ordinal: Int): Long = { - row.getLong(ordinal) - } - - override def setField(row: MutableRow, ordinal: Int, value: Long): Unit = { - row(ordinal) = value - } -} - private[sql] case class FIXED_DECIMAL(precision: Int, scale: Int) extends NativeColumnType( DecimalType(precision, scale), - 10, FIXED_DECIMAL.defaultSize) { override def extract(buffer: ByteBuffer): Decimal = { @@ -410,9 +369,7 @@ private[sql] object FIXED_DECIMAL { val defaultSize = 8 } -private[sql] sealed abstract class ByteArrayColumnType( - val typeId: Int, - val defaultSize: Int) +private[sql] sealed abstract class ByteArrayColumnType(val defaultSize: Int) extends ColumnType[Array[Byte]] { override def actualSize(row: InternalRow, ordinal: Int): Int = { @@ -431,7 +388,7 @@ private[sql] sealed abstract class ByteArrayColumnType( } } -private[sql] object BINARY extends ByteArrayColumnType(11, 16) { +private[sql] object BINARY extends ByteArrayColumnType(16) { def dataType: DataType = BooleanType @@ -447,7 +404,7 @@ private[sql] object BINARY extends ByteArrayColumnType(11, 16) { // Used to process generic objects (all types other than those listed above). Objects should be // serialized first before appending to the column `ByteBuffer`, and is also extracted as serialized // byte array. -private[sql] case class GENERIC(dataType: DataType) extends ByteArrayColumnType(12, 16) { +private[sql] case class GENERIC(dataType: DataType) extends ByteArrayColumnType(16) { override def setField(row: MutableRow, ordinal: Int, value: Array[Byte]): Unit = { row.update(ordinal, SparkSqlSerializer.deserialize[Any](value)) } @@ -463,10 +420,8 @@ private[sql] object ColumnType { case BooleanType => BOOLEAN case ByteType => BYTE case ShortType => SHORT - case IntegerType => INT - case DateType => DATE - case LongType => LONG - case TimestampType => TIMESTAMP + case IntegerType | DateType => INT + case LongType | TimestampType => LONG case FloatType => FLOAT case DoubleType => DOUBLE case StringType => STRING http://git-wip-us.apache.org/repos/asf/spark/blob/27ecfe61/sql/core/src/main/scala/org/apache/spark/sql/columnar/NullableColumnBuilder.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/columnar/NullableColumnBuilder.scala b/sql/core/src/main/scala/org/apache/spark/sql/columnar/NullableColumnBuilder.scala index ba47bc7..76cfddf 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/columnar/NullableColumnBuilder.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/columnar/NullableColumnBuilder.scala @@ -25,14 +25,13 @@ import org.apache.spark.sql.catalyst.InternalRow * A stackable trait used for building byte buffer for a column containing null values. Memory * layout of the final byte buffer is: * {{{ - * .----------------------- Column type ID (4 bytes) - * | .------------------- Null count N (4 bytes) - * | | .--------------- Null positions (4 x N bytes, empty if null count is zero) - * | | | .--------- Non-null elements - * V V V V - * +---+---+-----+---------+ - * | | | ... | ... ... | - * +---+---+-----+---------+ + * .------------------- Null count N (4 bytes) + * | .--------------- Null positions (4 x N bytes, empty if null count is zero) + * | | .--------- Non-null elements + * V V V + * +---+-----+---------+ + * | | ... | ... ... | + * +---+-----+---------+ * }}} */ private[sql] trait NullableColumnBuilder extends ColumnBuilder { @@ -66,16 +65,14 @@ private[sql] trait NullableColumnBuilder extends ColumnBuilder { abstract override def build(): ByteBuffer = { val nonNulls = super.build() - val typeId = nonNulls.getInt() val nullDataLen = nulls.position() nulls.limit(nullDataLen) nulls.rewind() val buffer = ByteBuffer - .allocate(4 + 4 + nullDataLen + nonNulls.remaining()) + .allocate(4 + nullDataLen + nonNulls.remaining()) .order(ByteOrder.nativeOrder()) - .putInt(typeId) .putInt(nullCount) .put(nulls) .put(nonNulls) http://git-wip-us.apache.org/repos/asf/spark/blob/27ecfe61/sql/core/src/main/scala/org/apache/spark/sql/columnar/compression/CompressibleColumnBuilder.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/columnar/compression/CompressibleColumnBuilder.scala b/sql/core/src/main/scala/org/apache/spark/sql/columnar/compression/CompressibleColumnBuilder.scala index 39b21dd..161021f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/columnar/compression/CompressibleColumnBuilder.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/columnar/compression/CompressibleColumnBuilder.scala @@ -28,17 +28,16 @@ import org.apache.spark.sql.types.AtomicType * A stackable trait that builds optionally compressed byte buffer for a column. Memory layout of * the final byte buffer is: * {{{ - * .--------------------------- Column type ID (4 bytes) - * | .----------------------- Null count N (4 bytes) - * | | .------------------- Null positions (4 x N bytes, empty if null count is zero) - * | | | .------------- Compression scheme ID (4 bytes) - * | | | | .--------- Compressed non-null elements - * V V V V V - * +---+---+-----+---+---------+ - * | | | ... | | ... ... | - * +---+---+-----+---+---------+ - * \-----------/ \-----------/ - * header body + * .----------------------- Null count N (4 bytes) + * | .------------------- Null positions (4 x N bytes, empty if null count is zero) + * | | .------------- Compression scheme ID (4 bytes) + * | | | .--------- Compressed non-null elements + * V V V V + * +---+-----+---+---------+ + * | | ... | | ... ... | + * +---+-----+---+---------+ + * \-------/ \-----------/ + * header body * }}} */ private[sql] trait CompressibleColumnBuilder[T <: AtomicType] @@ -83,14 +82,13 @@ private[sql] trait CompressibleColumnBuilder[T <: AtomicType] override def build(): ByteBuffer = { val nonNullBuffer = buildNonNulls() - val typeId = nonNullBuffer.getInt() val encoder: Encoder[T] = { val candidate = compressionEncoders.minBy(_.compressionRatio) if (isWorthCompressing(candidate)) candidate else PassThrough.encoder(columnType) } - // Header = column type ID + null count + null positions - val headerSize = 4 + 4 + nulls.limit() + // Header = null count + null positions + val headerSize = 4 + nulls.limit() val compressedSize = if (encoder.compressedSize == 0) { nonNullBuffer.remaining() } else { @@ -102,7 +100,6 @@ private[sql] trait CompressibleColumnBuilder[T <: AtomicType] .allocate(headerSize + 4 + compressedSize) .order(ByteOrder.nativeOrder) // Write the header - .putInt(typeId) .putInt(nullCount) .put(nulls) http://git-wip-us.apache.org/repos/asf/spark/blob/27ecfe61/sql/core/src/main/scala/org/apache/spark/sql/columnar/compression/CompressionScheme.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/columnar/compression/CompressionScheme.scala b/sql/core/src/main/scala/org/apache/spark/sql/columnar/compression/CompressionScheme.scala index b1ef9b2..9322b77 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/columnar/compression/CompressionScheme.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/columnar/compression/CompressionScheme.scala @@ -74,8 +74,8 @@ private[sql] object CompressionScheme { def columnHeaderSize(columnBuffer: ByteBuffer): Int = { val header = columnBuffer.duplicate().order(ByteOrder.nativeOrder) - val nullCount = header.getInt(4) - // Column type ID + null count + null positions - 4 + 4 + 4 * nullCount + val nullCount = header.getInt() + // null count + null positions + 4 + 4 * nullCount } } http://git-wip-us.apache.org/repos/asf/spark/blob/27ecfe61/sql/core/src/test/scala/org/apache/spark/sql/columnar/ColumnStatsSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/columnar/ColumnStatsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/columnar/ColumnStatsSuite.scala index d0430d2..708fb4c 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/columnar/ColumnStatsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/columnar/ColumnStatsSuite.scala @@ -27,10 +27,7 @@ class ColumnStatsSuite extends SparkFunSuite { testColumnStats(classOf[ByteColumnStats], BYTE, createRow(Byte.MaxValue, Byte.MinValue, 0)) testColumnStats(classOf[ShortColumnStats], SHORT, createRow(Short.MaxValue, Short.MinValue, 0)) testColumnStats(classOf[IntColumnStats], INT, createRow(Int.MaxValue, Int.MinValue, 0)) - testColumnStats(classOf[DateColumnStats], DATE, createRow(Int.MaxValue, Int.MinValue, 0)) testColumnStats(classOf[LongColumnStats], LONG, createRow(Long.MaxValue, Long.MinValue, 0)) - testColumnStats(classOf[TimestampColumnStats], TIMESTAMP, - createRow(Long.MaxValue, Long.MinValue, 0)) testColumnStats(classOf[FloatColumnStats], FLOAT, createRow(Float.MaxValue, Float.MinValue, 0)) testColumnStats(classOf[DoubleColumnStats], DOUBLE, createRow(Double.MaxValue, Double.MinValue, 0)) http://git-wip-us.apache.org/repos/asf/spark/blob/27ecfe61/sql/core/src/test/scala/org/apache/spark/sql/columnar/ColumnTypeSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/columnar/ColumnTypeSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/columnar/ColumnTypeSuite.scala index 8f02469..a4cbe35 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/columnar/ColumnTypeSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/columnar/ColumnTypeSuite.scala @@ -37,8 +37,8 @@ class ColumnTypeSuite extends SparkFunSuite with Logging { test("defaultSize") { val checks = Map( - BOOLEAN -> 1, BYTE -> 1, SHORT -> 2, INT -> 4, DATE -> 4, - LONG -> 8, TIMESTAMP -> 8, FLOAT -> 4, DOUBLE -> 8, + BOOLEAN -> 1, BYTE -> 1, SHORT -> 2, INT -> 4, + LONG -> 8, FLOAT -> 4, DOUBLE -> 8, STRING -> 8, BINARY -> 16, FIXED_DECIMAL(15, 10) -> 8, MAP_GENERIC -> 16) @@ -66,9 +66,7 @@ class ColumnTypeSuite extends SparkFunSuite with Logging { checkActualSize(BYTE, Byte.MaxValue, 1) checkActualSize(SHORT, Short.MaxValue, 2) checkActualSize(INT, Int.MaxValue, 4) - checkActualSize(DATE, Int.MaxValue, 4) checkActualSize(LONG, Long.MaxValue, 8) - checkActualSize(TIMESTAMP, Long.MaxValue, 8) checkActualSize(FLOAT, Float.MaxValue, 4) checkActualSize(DOUBLE, Double.MaxValue, 8) checkActualSize(STRING, UTF8String.fromString("hello"), 4 + "hello".getBytes("utf-8").length) @@ -93,12 +91,8 @@ class ColumnTypeSuite extends SparkFunSuite with Logging { testNativeColumnType(INT)(_.putInt(_), _.getInt) - testNativeColumnType(DATE)(_.putInt(_), _.getInt) - testNativeColumnType(LONG)(_.putLong(_), _.getLong) - testNativeColumnType(TIMESTAMP)(_.putLong(_), _.getLong) - testNativeColumnType(FLOAT)(_.putFloat(_), _.getFloat) testNativeColumnType(DOUBLE)(_.putDouble(_), _.getDouble) http://git-wip-us.apache.org/repos/asf/spark/blob/27ecfe61/sql/core/src/test/scala/org/apache/spark/sql/columnar/ColumnarTestUtils.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/columnar/ColumnarTestUtils.scala b/sql/core/src/test/scala/org/apache/spark/sql/columnar/ColumnarTestUtils.scala index 79bb7d0..123a705 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/columnar/ColumnarTestUtils.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/columnar/ColumnarTestUtils.scala @@ -19,9 +19,10 @@ package org.apache.spark.sql.columnar import scala.collection.immutable.HashSet import scala.util.Random + import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.GenericMutableRow -import org.apache.spark.sql.types.{DataType, Decimal, AtomicType} +import org.apache.spark.sql.types.{AtomicType, Decimal} import org.apache.spark.unsafe.types.UTF8String object ColumnarTestUtils { @@ -43,9 +44,7 @@ object ColumnarTestUtils { case BYTE => (Random.nextInt(Byte.MaxValue * 2) - Byte.MaxValue).toByte case SHORT => (Random.nextInt(Short.MaxValue * 2) - Short.MaxValue).toShort case INT => Random.nextInt() - case DATE => Random.nextInt() case LONG => Random.nextLong() - case TIMESTAMP => Random.nextLong() case FLOAT => Random.nextFloat() case DOUBLE => Random.nextDouble() case STRING => UTF8String.fromString(Random.nextString(Random.nextInt(32))) http://git-wip-us.apache.org/repos/asf/spark/blob/27ecfe61/sql/core/src/test/scala/org/apache/spark/sql/columnar/NullableColumnAccessorSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/columnar/NullableColumnAccessorSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/columnar/NullableColumnAccessorSuite.scala index f4f6c76..a3a23d3 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/columnar/NullableColumnAccessorSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/columnar/NullableColumnAccessorSuite.scala @@ -21,7 +21,7 @@ import java.nio.ByteBuffer import org.apache.spark.SparkFunSuite import org.apache.spark.sql.catalyst.expressions.GenericMutableRow -import org.apache.spark.sql.types.{StringType, ArrayType, DataType} +import org.apache.spark.sql.types.{ArrayType, StringType} class TestNullableColumnAccessor[JvmType]( buffer: ByteBuffer, @@ -32,17 +32,15 @@ class TestNullableColumnAccessor[JvmType]( object TestNullableColumnAccessor { def apply[JvmType](buffer: ByteBuffer, columnType: ColumnType[JvmType]) : TestNullableColumnAccessor[JvmType] = { - // Skips the column type ID - buffer.getInt() new TestNullableColumnAccessor(buffer, columnType) } } class NullableColumnAccessorSuite extends SparkFunSuite { - import ColumnarTestUtils._ + import org.apache.spark.sql.columnar.ColumnarTestUtils._ Seq( - BOOLEAN, BYTE, SHORT, INT, DATE, LONG, TIMESTAMP, FLOAT, DOUBLE, + BOOLEAN, BYTE, SHORT, INT, LONG, FLOAT, DOUBLE, STRING, BINARY, FIXED_DECIMAL(15, 10), GENERIC(ArrayType(StringType))) .foreach { testNullableColumnAccessor(_) http://git-wip-us.apache.org/repos/asf/spark/blob/27ecfe61/sql/core/src/test/scala/org/apache/spark/sql/columnar/NullableColumnBuilderSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/columnar/NullableColumnBuilderSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/columnar/NullableColumnBuilderSuite.scala index 241d09e..9557eea 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/columnar/NullableColumnBuilderSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/columnar/NullableColumnBuilderSuite.scala @@ -38,7 +38,7 @@ class NullableColumnBuilderSuite extends SparkFunSuite { import ColumnarTestUtils._ Seq( - BOOLEAN, BYTE, SHORT, INT, DATE, LONG, TIMESTAMP, FLOAT, DOUBLE, + BOOLEAN, BYTE, SHORT, INT, LONG, FLOAT, DOUBLE, STRING, BINARY, FIXED_DECIMAL(15, 10), GENERIC(ArrayType(StringType))) .foreach { testNullableColumnBuilder(_) @@ -53,7 +53,6 @@ class NullableColumnBuilderSuite extends SparkFunSuite { val columnBuilder = TestNullableColumnBuilder(columnType) val buffer = columnBuilder.build() - assertResult(columnType.typeId, "Wrong column type ID")(buffer.getInt()) assertResult(0, "Wrong null count")(buffer.getInt()) assert(!buffer.hasRemaining) } @@ -68,7 +67,6 @@ class NullableColumnBuilderSuite extends SparkFunSuite { val buffer = columnBuilder.build() - assertResult(columnType.typeId, "Wrong column type ID")(buffer.getInt()) assertResult(0, "Wrong null count")(buffer.getInt()) } @@ -84,7 +82,6 @@ class NullableColumnBuilderSuite extends SparkFunSuite { val buffer = columnBuilder.build() - assertResult(columnType.typeId, "Wrong column type ID")(buffer.getInt()) assertResult(4, "Wrong null count")(buffer.getInt()) // For null positions --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org For additional commands, e-mail: commits-help@spark.apache.org