Return-Path: X-Original-To: apmail-spark-commits-archive@minotaur.apache.org Delivered-To: apmail-spark-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 12F7C18F98 for ; Thu, 19 Nov 2015 22:48:31 +0000 (UTC) Received: (qmail 71558 invoked by uid 500); 19 Nov 2015 22:48:31 -0000 Delivered-To: apmail-spark-commits-archive@spark.apache.org Received: (qmail 71511 invoked by uid 500); 19 Nov 2015 22:48:31 -0000 Mailing-List: contact commits-help@spark.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Delivered-To: mailing list commits@spark.apache.org Received: (qmail 71279 invoked by uid 99); 19 Nov 2015 22:48:30 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 19 Nov 2015 22:48:30 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id BE115E33CF; Thu, 19 Nov 2015 22:48:30 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: rxin@apache.org To: commits@spark.apache.org Date: Thu, 19 Nov 2015 22:48:34 -0000 Message-Id: <9e95a824b63c4838afa80fb0bff20230@git.apache.org> In-Reply-To: <434d680c207d4b87ba19ea29add0a6bb@git.apache.org> References: <434d680c207d4b87ba19ea29add0a6bb@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [5/5] spark git commit: [SPARK-11858][SQL] Move sql.columnar into sql.execution. [SPARK-11858][SQL] Move sql.columnar into sql.execution. In addition, tightened visibility of a lot of classes in the columnar package from private[sql] to private[columnar]. Author: Reynold Xin Closes #9842 from rxin/SPARK-11858. (cherry picked from commit 014c0f7a9dfdb1686fa9aeacaadb2a17a855a943) Signed-off-by: Reynold Xin Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/ea1a51fc Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/ea1a51fc Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/ea1a51fc Branch: refs/heads/branch-1.6 Commit: ea1a51fc12d2e9558a5f322e915fdd46a3ae7357 Parents: abe3930 Author: Reynold Xin Authored: Thu Nov 19 14:48:18 2015 -0800 Committer: Reynold Xin Committed: Thu Nov 19 14:48:25 2015 -0800 ---------------------------------------------------------------------- .../spark/sql/columnar/ColumnAccessor.scala | 148 ---- .../spark/sql/columnar/ColumnBuilder.scala | 189 ----- .../apache/spark/sql/columnar/ColumnStats.scala | 271 -------- .../apache/spark/sql/columnar/ColumnType.scala | 689 ------------------- .../sql/columnar/GenerateColumnAccessor.scala | 195 ------ .../columnar/InMemoryColumnarTableScan.scala | 345 ---------- .../sql/columnar/NullableColumnAccessor.scala | 59 -- .../sql/columnar/NullableColumnBuilder.scala | 88 --- .../CompressibleColumnAccessor.scala | 39 -- .../compression/CompressibleColumnBuilder.scala | 109 --- .../compression/CompressionScheme.scala | 81 --- .../compression/compressionSchemes.scala | 532 -------------- .../spark/sql/execution/CacheManager.scala | 2 +- .../spark/sql/execution/SparkStrategies.scala | 2 +- .../sql/execution/columnar/ColumnAccessor.scala | 148 ++++ .../sql/execution/columnar/ColumnBuilder.scala | 194 ++++++ .../sql/execution/columnar/ColumnStats.scala | 271 ++++++++ .../sql/execution/columnar/ColumnType.scala | 689 +++++++++++++++++++ .../columnar/GenerateColumnAccessor.scala | 195 ++++++ .../columnar/InMemoryColumnarTableScan.scala | 346 ++++++++++ .../columnar/NullableColumnAccessor.scala | 59 ++ .../columnar/NullableColumnBuilder.scala | 88 +++ .../CompressibleColumnAccessor.scala | 39 ++ .../compression/CompressibleColumnBuilder.scala | 109 +++ .../compression/CompressionScheme.scala | 81 +++ .../compression/compressionSchemes.scala | 532 ++++++++++++++ .../apache/spark/sql/execution/package.scala | 2 + .../org/apache/spark/sql/CachedTableSuite.scala | 4 +- .../scala/org/apache/spark/sql/QueryTest.scala | 2 +- .../spark/sql/columnar/ColumnStatsSuite.scala | 110 --- .../spark/sql/columnar/ColumnTypeSuite.scala | 145 ---- .../spark/sql/columnar/ColumnarTestUtils.scala | 108 --- .../columnar/InMemoryColumnarQuerySuite.scala | 222 ------ .../columnar/NullableColumnAccessorSuite.scala | 92 --- .../columnar/NullableColumnBuilderSuite.scala | 107 --- .../columnar/PartitionBatchPruningSuite.scala | 127 ---- .../compression/BooleanBitSetSuite.scala | 107 --- .../compression/DictionaryEncodingSuite.scala | 128 ---- .../compression/IntegralDeltaSuite.scala | 131 ---- .../compression/RunLengthEncodingSuite.scala | 114 --- .../TestCompressibleColumnBuilder.scala | 44 -- .../execution/columnar/ColumnStatsSuite.scala | 110 +++ .../execution/columnar/ColumnTypeSuite.scala | 145 ++++ .../execution/columnar/ColumnarTestUtils.scala | 108 +++ .../columnar/InMemoryColumnarQuerySuite.scala | 222 ++++++ .../columnar/NullableColumnAccessorSuite.scala | 92 +++ .../columnar/NullableColumnBuilderSuite.scala | 107 +++ .../columnar/PartitionBatchPruningSuite.scala | 127 ++++ .../compression/BooleanBitSetSuite.scala | 107 +++ .../compression/DictionaryEncodingSuite.scala | 128 ++++ .../compression/IntegralDeltaSuite.scala | 131 ++++ .../compression/RunLengthEncodingSuite.scala | 114 +++ .../TestCompressibleColumnBuilder.scala | 44 ++ .../spark/sql/hive/CachedTableSuite.scala | 2 +- 54 files changed, 4194 insertions(+), 4186 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/ea1a51fc/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnAccessor.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnAccessor.scala b/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnAccessor.scala deleted file mode 100644 index 42ec4d3..0000000 --- a/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnAccessor.scala +++ /dev/null @@ -1,148 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.columnar - -import java.nio.{ByteBuffer, ByteOrder} - -import org.apache.spark.sql.catalyst.expressions.{MutableRow, UnsafeArrayData, UnsafeMapData, UnsafeRow} -import org.apache.spark.sql.columnar.compression.CompressibleColumnAccessor -import org.apache.spark.sql.types._ - -/** - * An `Iterator` like trait used to extract values from columnar byte buffer. When a value is - * extracted from the buffer, instead of directly returning it, the value is set into some field of - * a [[MutableRow]]. In this way, boxing cost can be avoided by leveraging the setter methods - * for primitive values provided by [[MutableRow]]. - */ -private[sql] trait ColumnAccessor { - initialize() - - protected def initialize() - - def hasNext: Boolean - - def extractTo(row: MutableRow, ordinal: Int) - - protected def underlyingBuffer: ByteBuffer -} - -private[sql] abstract class BasicColumnAccessor[JvmType]( - protected val buffer: ByteBuffer, - protected val columnType: ColumnType[JvmType]) - extends ColumnAccessor { - - protected def initialize() {} - - override def hasNext: Boolean = buffer.hasRemaining - - override def extractTo(row: MutableRow, ordinal: Int): Unit = { - extractSingle(row, ordinal) - } - - def extractSingle(row: MutableRow, ordinal: Int): Unit = { - columnType.extract(buffer, row, ordinal) - } - - protected def underlyingBuffer = buffer -} - -private[sql] class NullColumnAccessor(buffer: ByteBuffer) - extends BasicColumnAccessor[Any](buffer, NULL) - with NullableColumnAccessor - -private[sql] abstract class NativeColumnAccessor[T <: AtomicType]( - override protected val buffer: ByteBuffer, - override protected val columnType: NativeColumnType[T]) - extends BasicColumnAccessor(buffer, columnType) - with NullableColumnAccessor - with CompressibleColumnAccessor[T] - -private[sql] class BooleanColumnAccessor(buffer: ByteBuffer) - extends NativeColumnAccessor(buffer, BOOLEAN) - -private[sql] class ByteColumnAccessor(buffer: ByteBuffer) - extends NativeColumnAccessor(buffer, BYTE) - -private[sql] class ShortColumnAccessor(buffer: ByteBuffer) - extends NativeColumnAccessor(buffer, SHORT) - -private[sql] class IntColumnAccessor(buffer: ByteBuffer) - extends NativeColumnAccessor(buffer, INT) - -private[sql] class LongColumnAccessor(buffer: ByteBuffer) - extends NativeColumnAccessor(buffer, LONG) - -private[sql] class FloatColumnAccessor(buffer: ByteBuffer) - extends NativeColumnAccessor(buffer, FLOAT) - -private[sql] class DoubleColumnAccessor(buffer: ByteBuffer) - extends NativeColumnAccessor(buffer, DOUBLE) - -private[sql] class StringColumnAccessor(buffer: ByteBuffer) - extends NativeColumnAccessor(buffer, STRING) - -private[sql] class BinaryColumnAccessor(buffer: ByteBuffer) - extends BasicColumnAccessor[Array[Byte]](buffer, BINARY) - with NullableColumnAccessor - -private[sql] class CompactDecimalColumnAccessor(buffer: ByteBuffer, dataType: DecimalType) - extends NativeColumnAccessor(buffer, COMPACT_DECIMAL(dataType)) - -private[sql] class DecimalColumnAccessor(buffer: ByteBuffer, dataType: DecimalType) - extends BasicColumnAccessor[Decimal](buffer, LARGE_DECIMAL(dataType)) - with NullableColumnAccessor - -private[sql] class StructColumnAccessor(buffer: ByteBuffer, dataType: StructType) - extends BasicColumnAccessor[UnsafeRow](buffer, STRUCT(dataType)) - with NullableColumnAccessor - -private[sql] class ArrayColumnAccessor(buffer: ByteBuffer, dataType: ArrayType) - extends BasicColumnAccessor[UnsafeArrayData](buffer, ARRAY(dataType)) - with NullableColumnAccessor - -private[sql] class MapColumnAccessor(buffer: ByteBuffer, dataType: MapType) - extends BasicColumnAccessor[UnsafeMapData](buffer, MAP(dataType)) - with NullableColumnAccessor - -private[sql] object ColumnAccessor { - def apply(dataType: DataType, buffer: ByteBuffer): ColumnAccessor = { - val buf = buffer.order(ByteOrder.nativeOrder) - - dataType match { - case NullType => new NullColumnAccessor(buf) - case BooleanType => new BooleanColumnAccessor(buf) - case ByteType => new ByteColumnAccessor(buf) - case ShortType => new ShortColumnAccessor(buf) - case IntegerType | DateType => new IntColumnAccessor(buf) - case LongType | TimestampType => new LongColumnAccessor(buf) - case FloatType => new FloatColumnAccessor(buf) - case DoubleType => new DoubleColumnAccessor(buf) - case StringType => new StringColumnAccessor(buf) - case BinaryType => new BinaryColumnAccessor(buf) - case dt: DecimalType if dt.precision <= Decimal.MAX_LONG_DIGITS => - new CompactDecimalColumnAccessor(buf, dt) - case dt: DecimalType => new DecimalColumnAccessor(buf, dt) - case struct: StructType => new StructColumnAccessor(buf, struct) - case array: ArrayType => new ArrayColumnAccessor(buf, array) - case map: MapType => new MapColumnAccessor(buf, map) - case udt: UserDefinedType[_] => ColumnAccessor(udt.sqlType, buffer) - case other => - throw new Exception(s"not support type: $other") - } - } -} http://git-wip-us.apache.org/repos/asf/spark/blob/ea1a51fc/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnBuilder.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnBuilder.scala b/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnBuilder.scala deleted file mode 100644 index 599f30f..0000000 --- a/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnBuilder.scala +++ /dev/null @@ -1,189 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.columnar - -import java.nio.{ByteBuffer, ByteOrder} - -import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.columnar.ColumnBuilder._ -import org.apache.spark.sql.columnar.compression.{AllCompressionSchemes, CompressibleColumnBuilder} -import org.apache.spark.sql.types._ - -private[sql] trait ColumnBuilder { - /** - * Initializes with an approximate lower bound on the expected number of elements in this column. - */ - def initialize(initialSize: Int, columnName: String = "", useCompression: Boolean = false) - - /** - * Appends `row(ordinal)` to the column builder. - */ - def appendFrom(row: InternalRow, ordinal: Int) - - /** - * Column statistics information - */ - def columnStats: ColumnStats - - /** - * Returns the final columnar byte buffer. - */ - def build(): ByteBuffer -} - -private[sql] class BasicColumnBuilder[JvmType]( - val columnStats: ColumnStats, - val columnType: ColumnType[JvmType]) - extends ColumnBuilder { - - protected var columnName: String = _ - - protected var buffer: ByteBuffer = _ - - override def initialize( - initialSize: Int, - columnName: String = "", - useCompression: Boolean = false): Unit = { - - val size = if (initialSize == 0) DEFAULT_INITIAL_BUFFER_SIZE else initialSize - this.columnName = columnName - - buffer = ByteBuffer.allocate(size * columnType.defaultSize) - buffer.order(ByteOrder.nativeOrder()) - } - - override def appendFrom(row: InternalRow, ordinal: Int): Unit = { - buffer = ensureFreeSpace(buffer, columnType.actualSize(row, ordinal)) - columnType.append(row, ordinal, buffer) - } - - override def build(): ByteBuffer = { - if (buffer.capacity() > buffer.position() * 1.1) { - // trim the buffer - buffer = ByteBuffer - .allocate(buffer.position()) - .order(ByteOrder.nativeOrder()) - .put(buffer.array(), 0, buffer.position()) - } - buffer.flip().asInstanceOf[ByteBuffer] - } -} - -private[sql] class NullColumnBuilder - extends BasicColumnBuilder[Any](new ObjectColumnStats(NullType), NULL) - with NullableColumnBuilder - -private[sql] abstract class ComplexColumnBuilder[JvmType]( - columnStats: ColumnStats, - columnType: ColumnType[JvmType]) - extends BasicColumnBuilder[JvmType](columnStats, columnType) - with NullableColumnBuilder - -private[sql] abstract class NativeColumnBuilder[T <: AtomicType]( - override val columnStats: ColumnStats, - override val columnType: NativeColumnType[T]) - extends BasicColumnBuilder[T#InternalType](columnStats, columnType) - with NullableColumnBuilder - with AllCompressionSchemes - with CompressibleColumnBuilder[T] - -private[sql] class BooleanColumnBuilder extends NativeColumnBuilder(new BooleanColumnStats, BOOLEAN) - -private[sql] class ByteColumnBuilder extends NativeColumnBuilder(new ByteColumnStats, BYTE) - -private[sql] class ShortColumnBuilder extends NativeColumnBuilder(new ShortColumnStats, SHORT) - -private[sql] class IntColumnBuilder extends NativeColumnBuilder(new IntColumnStats, INT) - -private[sql] class LongColumnBuilder extends NativeColumnBuilder(new LongColumnStats, LONG) - -private[sql] class FloatColumnBuilder extends NativeColumnBuilder(new FloatColumnStats, FLOAT) - -private[sql] class DoubleColumnBuilder extends NativeColumnBuilder(new DoubleColumnStats, DOUBLE) - -private[sql] class StringColumnBuilder extends NativeColumnBuilder(new StringColumnStats, STRING) - -private[sql] class BinaryColumnBuilder extends ComplexColumnBuilder(new BinaryColumnStats, BINARY) - -private[sql] class CompactDecimalColumnBuilder(dataType: DecimalType) - extends NativeColumnBuilder(new DecimalColumnStats(dataType), COMPACT_DECIMAL(dataType)) - -private[sql] class DecimalColumnBuilder(dataType: DecimalType) - extends ComplexColumnBuilder(new DecimalColumnStats(dataType), LARGE_DECIMAL(dataType)) - -private[sql] class StructColumnBuilder(dataType: StructType) - extends ComplexColumnBuilder(new ObjectColumnStats(dataType), STRUCT(dataType)) - -private[sql] class ArrayColumnBuilder(dataType: ArrayType) - extends ComplexColumnBuilder(new ObjectColumnStats(dataType), ARRAY(dataType)) - -private[sql] class MapColumnBuilder(dataType: MapType) - extends ComplexColumnBuilder(new ObjectColumnStats(dataType), MAP(dataType)) - -private[sql] object ColumnBuilder { - val DEFAULT_INITIAL_BUFFER_SIZE = 128 * 1024 - val MAX_BATCH_SIZE_IN_BYTE = 4 * 1024 * 1024L - - private[columnar] def ensureFreeSpace(orig: ByteBuffer, size: Int) = { - if (orig.remaining >= size) { - orig - } else { - // grow in steps of initial size - val capacity = orig.capacity() - val newSize = capacity + size.max(capacity) - val pos = orig.position() - - ByteBuffer - .allocate(newSize) - .order(ByteOrder.nativeOrder()) - .put(orig.array(), 0, pos) - } - } - - def apply( - dataType: DataType, - initialSize: Int = 0, - columnName: String = "", - useCompression: Boolean = false): ColumnBuilder = { - val builder: ColumnBuilder = dataType match { - case NullType => new NullColumnBuilder - case BooleanType => new BooleanColumnBuilder - case ByteType => new ByteColumnBuilder - case ShortType => new ShortColumnBuilder - case IntegerType | DateType => new IntColumnBuilder - case LongType | TimestampType => new LongColumnBuilder - case FloatType => new FloatColumnBuilder - case DoubleType => new DoubleColumnBuilder - case StringType => new StringColumnBuilder - case BinaryType => new BinaryColumnBuilder - case dt: DecimalType if dt.precision <= Decimal.MAX_LONG_DIGITS => - new CompactDecimalColumnBuilder(dt) - case dt: DecimalType => new DecimalColumnBuilder(dt) - case struct: StructType => new StructColumnBuilder(struct) - case array: ArrayType => new ArrayColumnBuilder(array) - case map: MapType => new MapColumnBuilder(map) - case udt: UserDefinedType[_] => - return apply(udt.sqlType, initialSize, columnName, useCompression) - case other => - throw new Exception(s"not suppported type: $other") - } - - builder.initialize(initialSize, columnName, useCompression) - builder - } -} http://git-wip-us.apache.org/repos/asf/spark/blob/ea1a51fc/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnStats.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnStats.scala b/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnStats.scala deleted file mode 100644 index 91a0565..0000000 --- a/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnStats.scala +++ /dev/null @@ -1,271 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.columnar - -import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.catalyst.expressions.{GenericInternalRow, Attribute, AttributeMap, AttributeReference} -import org.apache.spark.sql.types._ -import org.apache.spark.unsafe.types.UTF8String - -private[sql] class ColumnStatisticsSchema(a: Attribute) extends Serializable { - val upperBound = AttributeReference(a.name + ".upperBound", a.dataType, nullable = true)() - val lowerBound = AttributeReference(a.name + ".lowerBound", a.dataType, nullable = true)() - val nullCount = AttributeReference(a.name + ".nullCount", IntegerType, nullable = false)() - val count = AttributeReference(a.name + ".count", IntegerType, nullable = false)() - val sizeInBytes = AttributeReference(a.name + ".sizeInBytes", LongType, nullable = false)() - - val schema = Seq(lowerBound, upperBound, nullCount, count, sizeInBytes) -} - -private[sql] class PartitionStatistics(tableSchema: Seq[Attribute]) extends Serializable { - val (forAttribute, schema) = { - val allStats = tableSchema.map(a => a -> new ColumnStatisticsSchema(a)) - (AttributeMap(allStats), allStats.map(_._2.schema).foldLeft(Seq.empty[Attribute])(_ ++ _)) - } -} - -/** - * Used to collect statistical information when building in-memory columns. - * - * NOTE: we intentionally avoid using `Ordering[T]` to compare values here because `Ordering[T]` - * brings significant performance penalty. - */ -private[sql] sealed trait ColumnStats extends Serializable { - protected var count = 0 - protected var nullCount = 0 - private[sql] var sizeInBytes = 0L - - /** - * Gathers statistics information from `row(ordinal)`. - */ - def gatherStats(row: InternalRow, ordinal: Int): Unit = { - if (row.isNullAt(ordinal)) { - nullCount += 1 - // 4 bytes for null position - sizeInBytes += 4 - } - count += 1 - } - - /** - * Column statistics represented as a single row, currently including closed lower bound, closed - * upper bound and null count. - */ - def collectedStatistics: GenericInternalRow -} - -/** - * A no-op ColumnStats only used for testing purposes. - */ -private[sql] class NoopColumnStats extends ColumnStats { - override def gatherStats(row: InternalRow, ordinal: Int): Unit = super.gatherStats(row, ordinal) - - override def collectedStatistics: GenericInternalRow = - new GenericInternalRow(Array[Any](null, null, nullCount, count, 0L)) -} - -private[sql] class BooleanColumnStats extends ColumnStats { - protected var upper = false - protected var lower = true - - override def gatherStats(row: InternalRow, ordinal: Int): Unit = { - super.gatherStats(row, ordinal) - if (!row.isNullAt(ordinal)) { - val value = row.getBoolean(ordinal) - if (value > upper) upper = value - if (value < lower) lower = value - sizeInBytes += BOOLEAN.defaultSize - } - } - - override def collectedStatistics: GenericInternalRow = - new GenericInternalRow(Array[Any](lower, upper, nullCount, count, sizeInBytes)) -} - -private[sql] class ByteColumnStats extends ColumnStats { - protected var upper = Byte.MinValue - protected var lower = Byte.MaxValue - - override def gatherStats(row: InternalRow, ordinal: Int): Unit = { - super.gatherStats(row, ordinal) - if (!row.isNullAt(ordinal)) { - val value = row.getByte(ordinal) - if (value > upper) upper = value - if (value < lower) lower = value - sizeInBytes += BYTE.defaultSize - } - } - - override def collectedStatistics: GenericInternalRow = - new GenericInternalRow(Array[Any](lower, upper, nullCount, count, sizeInBytes)) -} - -private[sql] class ShortColumnStats extends ColumnStats { - protected var upper = Short.MinValue - protected var lower = Short.MaxValue - - override def gatherStats(row: InternalRow, ordinal: Int): Unit = { - super.gatherStats(row, ordinal) - if (!row.isNullAt(ordinal)) { - val value = row.getShort(ordinal) - if (value > upper) upper = value - if (value < lower) lower = value - sizeInBytes += SHORT.defaultSize - } - } - - override def collectedStatistics: GenericInternalRow = - new GenericInternalRow(Array[Any](lower, upper, nullCount, count, sizeInBytes)) -} - -private[sql] class IntColumnStats extends ColumnStats { - protected var upper = Int.MinValue - protected var lower = Int.MaxValue - - override def gatherStats(row: InternalRow, ordinal: Int): Unit = { - super.gatherStats(row, ordinal) - if (!row.isNullAt(ordinal)) { - val value = row.getInt(ordinal) - if (value > upper) upper = value - if (value < lower) lower = value - sizeInBytes += INT.defaultSize - } - } - - override def collectedStatistics: GenericInternalRow = - new GenericInternalRow(Array[Any](lower, upper, nullCount, count, sizeInBytes)) -} - -private[sql] class LongColumnStats extends ColumnStats { - protected var upper = Long.MinValue - protected var lower = Long.MaxValue - - override def gatherStats(row: InternalRow, ordinal: Int): Unit = { - super.gatherStats(row, ordinal) - if (!row.isNullAt(ordinal)) { - val value = row.getLong(ordinal) - if (value > upper) upper = value - if (value < lower) lower = value - sizeInBytes += LONG.defaultSize - } - } - - override def collectedStatistics: GenericInternalRow = - new GenericInternalRow(Array[Any](lower, upper, nullCount, count, sizeInBytes)) -} - -private[sql] class FloatColumnStats extends ColumnStats { - protected var upper = Float.MinValue - protected var lower = Float.MaxValue - - override def gatherStats(row: InternalRow, ordinal: Int): Unit = { - super.gatherStats(row, ordinal) - if (!row.isNullAt(ordinal)) { - val value = row.getFloat(ordinal) - if (value > upper) upper = value - if (value < lower) lower = value - sizeInBytes += FLOAT.defaultSize - } - } - - override def collectedStatistics: GenericInternalRow = - new GenericInternalRow(Array[Any](lower, upper, nullCount, count, sizeInBytes)) -} - -private[sql] class DoubleColumnStats extends ColumnStats { - protected var upper = Double.MinValue - protected var lower = Double.MaxValue - - override def gatherStats(row: InternalRow, ordinal: Int): Unit = { - super.gatherStats(row, ordinal) - if (!row.isNullAt(ordinal)) { - val value = row.getDouble(ordinal) - if (value > upper) upper = value - if (value < lower) lower = value - sizeInBytes += DOUBLE.defaultSize - } - } - - override def collectedStatistics: GenericInternalRow = - new GenericInternalRow(Array[Any](lower, upper, nullCount, count, sizeInBytes)) -} - -private[sql] class StringColumnStats extends ColumnStats { - protected var upper: UTF8String = null - protected var lower: UTF8String = null - - override def gatherStats(row: InternalRow, ordinal: Int): Unit = { - super.gatherStats(row, ordinal) - if (!row.isNullAt(ordinal)) { - val value = row.getUTF8String(ordinal) - if (upper == null || value.compareTo(upper) > 0) upper = value.clone() - if (lower == null || value.compareTo(lower) < 0) lower = value.clone() - sizeInBytes += STRING.actualSize(row, ordinal) - } - } - - override def collectedStatistics: GenericInternalRow = - new GenericInternalRow(Array[Any](lower, upper, nullCount, count, sizeInBytes)) -} - -private[sql] class BinaryColumnStats extends ColumnStats { - override def gatherStats(row: InternalRow, ordinal: Int): Unit = { - super.gatherStats(row, ordinal) - if (!row.isNullAt(ordinal)) { - sizeInBytes += BINARY.actualSize(row, ordinal) - } - } - - override def collectedStatistics: GenericInternalRow = - new GenericInternalRow(Array[Any](null, null, nullCount, count, sizeInBytes)) -} - -private[sql] class DecimalColumnStats(precision: Int, scale: Int) extends ColumnStats { - def this(dt: DecimalType) = this(dt.precision, dt.scale) - - protected var upper: Decimal = null - protected var lower: Decimal = null - - override def gatherStats(row: InternalRow, ordinal: Int): Unit = { - super.gatherStats(row, ordinal) - if (!row.isNullAt(ordinal)) { - val value = row.getDecimal(ordinal, precision, scale) - if (upper == null || value.compareTo(upper) > 0) upper = value - if (lower == null || value.compareTo(lower) < 0) lower = value - // TODO: this is not right for DecimalType with precision > 18 - sizeInBytes += 8 - } - } - - override def collectedStatistics: GenericInternalRow = - new GenericInternalRow(Array[Any](lower, upper, nullCount, count, sizeInBytes)) -} - -private[sql] class ObjectColumnStats(dataType: DataType) extends ColumnStats { - val columnType = ColumnType(dataType) - - override def gatherStats(row: InternalRow, ordinal: Int): Unit = { - super.gatherStats(row, ordinal) - if (!row.isNullAt(ordinal)) { - sizeInBytes += columnType.actualSize(row, ordinal) - } - } - - override def collectedStatistics: GenericInternalRow = - new GenericInternalRow(Array[Any](null, null, nullCount, count, sizeInBytes)) -} http://git-wip-us.apache.org/repos/asf/spark/blob/ea1a51fc/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnType.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnType.scala b/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnType.scala deleted file mode 100644 index 68e509e..0000000 --- a/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnType.scala +++ /dev/null @@ -1,689 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.columnar - -import java.math.{BigDecimal, BigInteger} -import java.nio.ByteBuffer - -import scala.reflect.runtime.universe.TypeTag - -import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.catalyst.expressions._ -import org.apache.spark.sql.types._ -import org.apache.spark.unsafe.Platform -import org.apache.spark.unsafe.types.UTF8String - - -/** - * A help class for fast reading Int/Long/Float/Double from ByteBuffer in native order. - * - * Note: There is not much difference between ByteBuffer.getByte/getShort and - * Unsafe.getByte/getShort, so we do not have helper methods for them. - * - * The unrolling (building columnar cache) is already slow, putLong/putDouble will not help much, - * so we do not have helper methods for them. - * - * - * WARNNING: This only works with HeapByteBuffer - */ -object ByteBufferHelper { - def getInt(buffer: ByteBuffer): Int = { - val pos = buffer.position() - buffer.position(pos + 4) - Platform.getInt(buffer.array(), Platform.BYTE_ARRAY_OFFSET + pos) - } - - def getLong(buffer: ByteBuffer): Long = { - val pos = buffer.position() - buffer.position(pos + 8) - Platform.getLong(buffer.array(), Platform.BYTE_ARRAY_OFFSET + pos) - } - - def getFloat(buffer: ByteBuffer): Float = { - val pos = buffer.position() - buffer.position(pos + 4) - Platform.getFloat(buffer.array(), Platform.BYTE_ARRAY_OFFSET + pos) - } - - def getDouble(buffer: ByteBuffer): Double = { - val pos = buffer.position() - buffer.position(pos + 8) - Platform.getDouble(buffer.array(), Platform.BYTE_ARRAY_OFFSET + pos) - } -} - -/** - * An abstract class that represents type of a column. Used to append/extract Java objects into/from - * the underlying [[ByteBuffer]] of a column. - * - * @tparam JvmType Underlying Java type to represent the elements. - */ -private[sql] sealed abstract class ColumnType[JvmType] { - - // The catalyst data type of this column. - def dataType: DataType - - // Default size in bytes for one element of type T (e.g. 4 for `Int`). - def defaultSize: Int - - /** - * Extracts a value out of the buffer at the buffer's current position. - */ - def extract(buffer: ByteBuffer): JvmType - - /** - * Extracts a value out of the buffer at the buffer's current position and stores in - * `row(ordinal)`. Subclasses should override this method to avoid boxing/unboxing costs whenever - * possible. - */ - def extract(buffer: ByteBuffer, row: MutableRow, ordinal: Int): Unit = { - setField(row, ordinal, extract(buffer)) - } - - /** - * Appends the given value v of type T into the given ByteBuffer. - */ - def append(v: JvmType, buffer: ByteBuffer): Unit - - /** - * Appends `row(ordinal)` of type T into the given ByteBuffer. Subclasses should override this - * method to avoid boxing/unboxing costs whenever possible. - */ - def append(row: InternalRow, ordinal: Int, buffer: ByteBuffer): Unit = { - append(getField(row, ordinal), buffer) - } - - /** - * Returns the size of the value `row(ordinal)`. This is used to calculate the size of variable - * length types such as byte arrays and strings. - */ - def actualSize(row: InternalRow, ordinal: Int): Int = defaultSize - - /** - * Returns `row(ordinal)`. Subclasses should override this method to avoid boxing/unboxing costs - * whenever possible. - */ - def getField(row: InternalRow, ordinal: Int): JvmType - - /** - * Sets `row(ordinal)` to `field`. Subclasses should override this method to avoid boxing/unboxing - * costs whenever possible. - */ - def setField(row: MutableRow, ordinal: Int, value: JvmType): Unit - - /** - * Copies `from(fromOrdinal)` to `to(toOrdinal)`. Subclasses should override this method to avoid - * boxing/unboxing costs whenever possible. - */ - def copyField(from: InternalRow, fromOrdinal: Int, to: MutableRow, toOrdinal: Int): Unit = { - setField(to, toOrdinal, getField(from, fromOrdinal)) - } - - /** - * Creates a duplicated copy of the value. - */ - def clone(v: JvmType): JvmType = v - - override def toString: String = getClass.getSimpleName.stripSuffix("$") -} - -private[sql] object NULL extends ColumnType[Any] { - - override def dataType: DataType = NullType - override def defaultSize: Int = 0 - override def append(v: Any, buffer: ByteBuffer): Unit = {} - override def extract(buffer: ByteBuffer): Any = null - override def setField(row: MutableRow, ordinal: Int, value: Any): Unit = row.setNullAt(ordinal) - override def getField(row: InternalRow, ordinal: Int): Any = null -} - -private[sql] abstract class NativeColumnType[T <: AtomicType]( - val dataType: T, - val defaultSize: Int) - extends ColumnType[T#InternalType] { - - /** - * Scala TypeTag. Can be used to create primitive arrays and hash tables. - */ - def scalaTag: TypeTag[dataType.InternalType] = dataType.tag -} - -private[sql] object INT extends NativeColumnType(IntegerType, 4) { - override def append(v: Int, buffer: ByteBuffer): Unit = { - buffer.putInt(v) - } - - override def append(row: InternalRow, ordinal: Int, buffer: ByteBuffer): Unit = { - buffer.putInt(row.getInt(ordinal)) - } - - override def extract(buffer: ByteBuffer): Int = { - ByteBufferHelper.getInt(buffer) - } - - override def extract(buffer: ByteBuffer, row: MutableRow, ordinal: Int): Unit = { - row.setInt(ordinal, ByteBufferHelper.getInt(buffer)) - } - - override def setField(row: MutableRow, ordinal: Int, value: Int): Unit = { - row.setInt(ordinal, value) - } - - override def getField(row: InternalRow, ordinal: Int): Int = row.getInt(ordinal) - - - override def copyField(from: InternalRow, fromOrdinal: Int, to: MutableRow, toOrdinal: Int) { - to.setInt(toOrdinal, from.getInt(fromOrdinal)) - } -} - -private[sql] object LONG extends NativeColumnType(LongType, 8) { - override def append(v: Long, buffer: ByteBuffer): Unit = { - buffer.putLong(v) - } - - override def append(row: InternalRow, ordinal: Int, buffer: ByteBuffer): Unit = { - buffer.putLong(row.getLong(ordinal)) - } - - override def extract(buffer: ByteBuffer): Long = { - ByteBufferHelper.getLong(buffer) - } - - override def extract(buffer: ByteBuffer, row: MutableRow, ordinal: Int): Unit = { - row.setLong(ordinal, ByteBufferHelper.getLong(buffer)) - } - - override def setField(row: MutableRow, ordinal: Int, value: Long): Unit = { - row.setLong(ordinal, value) - } - - override def getField(row: InternalRow, ordinal: Int): Long = row.getLong(ordinal) - - override def copyField(from: InternalRow, fromOrdinal: Int, to: MutableRow, toOrdinal: Int) { - to.setLong(toOrdinal, from.getLong(fromOrdinal)) - } -} - -private[sql] object FLOAT extends NativeColumnType(FloatType, 4) { - override def append(v: Float, buffer: ByteBuffer): Unit = { - buffer.putFloat(v) - } - - override def append(row: InternalRow, ordinal: Int, buffer: ByteBuffer): Unit = { - buffer.putFloat(row.getFloat(ordinal)) - } - - override def extract(buffer: ByteBuffer): Float = { - ByteBufferHelper.getFloat(buffer) - } - - override def extract(buffer: ByteBuffer, row: MutableRow, ordinal: Int): Unit = { - row.setFloat(ordinal, ByteBufferHelper.getFloat(buffer)) - } - - override def setField(row: MutableRow, ordinal: Int, value: Float): Unit = { - row.setFloat(ordinal, value) - } - - override def getField(row: InternalRow, ordinal: Int): Float = row.getFloat(ordinal) - - override def copyField(from: InternalRow, fromOrdinal: Int, to: MutableRow, toOrdinal: Int) { - to.setFloat(toOrdinal, from.getFloat(fromOrdinal)) - } -} - -private[sql] object DOUBLE extends NativeColumnType(DoubleType, 8) { - override def append(v: Double, buffer: ByteBuffer): Unit = { - buffer.putDouble(v) - } - - override def append(row: InternalRow, ordinal: Int, buffer: ByteBuffer): Unit = { - buffer.putDouble(row.getDouble(ordinal)) - } - - override def extract(buffer: ByteBuffer): Double = { - ByteBufferHelper.getDouble(buffer) - } - - override def extract(buffer: ByteBuffer, row: MutableRow, ordinal: Int): Unit = { - row.setDouble(ordinal, ByteBufferHelper.getDouble(buffer)) - } - - override def setField(row: MutableRow, ordinal: Int, value: Double): Unit = { - row.setDouble(ordinal, value) - } - - override def getField(row: InternalRow, ordinal: Int): Double = row.getDouble(ordinal) - - override def copyField(from: InternalRow, fromOrdinal: Int, to: MutableRow, toOrdinal: Int) { - to.setDouble(toOrdinal, from.getDouble(fromOrdinal)) - } -} - -private[sql] object BOOLEAN extends NativeColumnType(BooleanType, 1) { - override def append(v: Boolean, buffer: ByteBuffer): Unit = { - buffer.put(if (v) 1: Byte else 0: Byte) - } - - override def append(row: InternalRow, ordinal: Int, buffer: ByteBuffer): Unit = { - buffer.put(if (row.getBoolean(ordinal)) 1: Byte else 0: Byte) - } - - override def extract(buffer: ByteBuffer): Boolean = buffer.get() == 1 - - override def extract(buffer: ByteBuffer, row: MutableRow, ordinal: Int): Unit = { - row.setBoolean(ordinal, buffer.get() == 1) - } - - override def setField(row: MutableRow, ordinal: Int, value: Boolean): Unit = { - row.setBoolean(ordinal, value) - } - - override def getField(row: InternalRow, ordinal: Int): Boolean = row.getBoolean(ordinal) - - override def copyField(from: InternalRow, fromOrdinal: Int, to: MutableRow, toOrdinal: Int) { - to.setBoolean(toOrdinal, from.getBoolean(fromOrdinal)) - } -} - -private[sql] object BYTE extends NativeColumnType(ByteType, 1) { - override def append(v: Byte, buffer: ByteBuffer): Unit = { - buffer.put(v) - } - - override def append(row: InternalRow, ordinal: Int, buffer: ByteBuffer): Unit = { - buffer.put(row.getByte(ordinal)) - } - - override def extract(buffer: ByteBuffer): Byte = { - buffer.get() - } - - override def extract(buffer: ByteBuffer, row: MutableRow, ordinal: Int): Unit = { - row.setByte(ordinal, buffer.get()) - } - - override def setField(row: MutableRow, ordinal: Int, value: Byte): Unit = { - row.setByte(ordinal, value) - } - - override def getField(row: InternalRow, ordinal: Int): Byte = row.getByte(ordinal) - - override def copyField(from: InternalRow, fromOrdinal: Int, to: MutableRow, toOrdinal: Int) { - to.setByte(toOrdinal, from.getByte(fromOrdinal)) - } -} - -private[sql] object SHORT extends NativeColumnType(ShortType, 2) { - override def append(v: Short, buffer: ByteBuffer): Unit = { - buffer.putShort(v) - } - - override def append(row: InternalRow, ordinal: Int, buffer: ByteBuffer): Unit = { - buffer.putShort(row.getShort(ordinal)) - } - - override def extract(buffer: ByteBuffer): Short = { - buffer.getShort() - } - - override def extract(buffer: ByteBuffer, row: MutableRow, ordinal: Int): Unit = { - row.setShort(ordinal, buffer.getShort()) - } - - override def setField(row: MutableRow, ordinal: Int, value: Short): Unit = { - row.setShort(ordinal, value) - } - - override def getField(row: InternalRow, ordinal: Int): Short = row.getShort(ordinal) - - override def copyField(from: InternalRow, fromOrdinal: Int, to: MutableRow, toOrdinal: Int) { - to.setShort(toOrdinal, from.getShort(fromOrdinal)) - } -} - -/** - * A fast path to copy var-length bytes between ByteBuffer and UnsafeRow without creating wrapper - * objects. - */ -private[sql] trait DirectCopyColumnType[JvmType] extends ColumnType[JvmType] { - - // copy the bytes from ByteBuffer to UnsafeRow - override def extract(buffer: ByteBuffer, row: MutableRow, ordinal: Int): Unit = { - if (row.isInstanceOf[MutableUnsafeRow]) { - val numBytes = buffer.getInt - val cursor = buffer.position() - buffer.position(cursor + numBytes) - row.asInstanceOf[MutableUnsafeRow].writer.write(ordinal, buffer.array(), - buffer.arrayOffset() + cursor, numBytes) - } else { - setField(row, ordinal, extract(buffer)) - } - } - - // copy the bytes from UnsafeRow to ByteBuffer - override def append(row: InternalRow, ordinal: Int, buffer: ByteBuffer): Unit = { - if (row.isInstanceOf[UnsafeRow]) { - row.asInstanceOf[UnsafeRow].writeFieldTo(ordinal, buffer) - } else { - super.append(row, ordinal, buffer) - } - } -} - -private[sql] object STRING - extends NativeColumnType(StringType, 8) with DirectCopyColumnType[UTF8String] { - - override def actualSize(row: InternalRow, ordinal: Int): Int = { - row.getUTF8String(ordinal).numBytes() + 4 - } - - override def append(v: UTF8String, buffer: ByteBuffer): Unit = { - buffer.putInt(v.numBytes()) - v.writeTo(buffer) - } - - override def extract(buffer: ByteBuffer): UTF8String = { - val length = buffer.getInt() - val cursor = buffer.position() - buffer.position(cursor + length) - UTF8String.fromBytes(buffer.array(), buffer.arrayOffset() + cursor, length) - } - - override def setField(row: MutableRow, ordinal: Int, value: UTF8String): Unit = { - if (row.isInstanceOf[MutableUnsafeRow]) { - row.asInstanceOf[MutableUnsafeRow].writer.write(ordinal, value) - } else { - row.update(ordinal, value.clone()) - } - } - - override def getField(row: InternalRow, ordinal: Int): UTF8String = { - row.getUTF8String(ordinal) - } - - override def copyField(from: InternalRow, fromOrdinal: Int, to: MutableRow, toOrdinal: Int) { - setField(to, toOrdinal, getField(from, fromOrdinal)) - } - - override def clone(v: UTF8String): UTF8String = v.clone() -} - -private[sql] case class COMPACT_DECIMAL(precision: Int, scale: Int) - extends NativeColumnType(DecimalType(precision, scale), 8) { - - override def extract(buffer: ByteBuffer): Decimal = { - Decimal(ByteBufferHelper.getLong(buffer), precision, scale) - } - - override def extract(buffer: ByteBuffer, row: MutableRow, ordinal: Int): Unit = { - if (row.isInstanceOf[MutableUnsafeRow]) { - // copy it as Long - row.setLong(ordinal, ByteBufferHelper.getLong(buffer)) - } else { - setField(row, ordinal, extract(buffer)) - } - } - - override def append(v: Decimal, buffer: ByteBuffer): Unit = { - buffer.putLong(v.toUnscaledLong) - } - - override def append(row: InternalRow, ordinal: Int, buffer: ByteBuffer): Unit = { - if (row.isInstanceOf[UnsafeRow]) { - // copy it as Long - buffer.putLong(row.getLong(ordinal)) - } else { - append(getField(row, ordinal), buffer) - } - } - - override def getField(row: InternalRow, ordinal: Int): Decimal = { - row.getDecimal(ordinal, precision, scale) - } - - override def setField(row: MutableRow, ordinal: Int, value: Decimal): Unit = { - row.setDecimal(ordinal, value, precision) - } - - override def copyField(from: InternalRow, fromOrdinal: Int, to: MutableRow, toOrdinal: Int) { - setField(to, toOrdinal, getField(from, fromOrdinal)) - } -} - -private[sql] object COMPACT_DECIMAL { - def apply(dt: DecimalType): COMPACT_DECIMAL = { - COMPACT_DECIMAL(dt.precision, dt.scale) - } -} - -private[sql] sealed abstract class ByteArrayColumnType[JvmType](val defaultSize: Int) - extends ColumnType[JvmType] with DirectCopyColumnType[JvmType] { - - def serialize(value: JvmType): Array[Byte] - def deserialize(bytes: Array[Byte]): JvmType - - override def append(v: JvmType, buffer: ByteBuffer): Unit = { - val bytes = serialize(v) - buffer.putInt(bytes.length).put(bytes, 0, bytes.length) - } - - override def extract(buffer: ByteBuffer): JvmType = { - val length = buffer.getInt() - val bytes = new Array[Byte](length) - buffer.get(bytes, 0, length) - deserialize(bytes) - } -} - -private[sql] object BINARY extends ByteArrayColumnType[Array[Byte]](16) { - - def dataType: DataType = BinaryType - - override def setField(row: MutableRow, ordinal: Int, value: Array[Byte]): Unit = { - row.update(ordinal, value) - } - - override def getField(row: InternalRow, ordinal: Int): Array[Byte] = { - row.getBinary(ordinal) - } - - override def actualSize(row: InternalRow, ordinal: Int): Int = { - row.getBinary(ordinal).length + 4 - } - - def serialize(value: Array[Byte]): Array[Byte] = value - def deserialize(bytes: Array[Byte]): Array[Byte] = bytes -} - -private[sql] case class LARGE_DECIMAL(precision: Int, scale: Int) - extends ByteArrayColumnType[Decimal](12) { - - override val dataType: DataType = DecimalType(precision, scale) - - override def getField(row: InternalRow, ordinal: Int): Decimal = { - row.getDecimal(ordinal, precision, scale) - } - - override def setField(row: MutableRow, ordinal: Int, value: Decimal): Unit = { - row.setDecimal(ordinal, value, precision) - } - - override def actualSize(row: InternalRow, ordinal: Int): Int = { - 4 + getField(row, ordinal).toJavaBigDecimal.unscaledValue().bitLength() / 8 + 1 - } - - override def serialize(value: Decimal): Array[Byte] = { - value.toJavaBigDecimal.unscaledValue().toByteArray - } - - override def deserialize(bytes: Array[Byte]): Decimal = { - val javaDecimal = new BigDecimal(new BigInteger(bytes), scale) - Decimal.apply(javaDecimal, precision, scale) - } -} - -private[sql] object LARGE_DECIMAL { - def apply(dt: DecimalType): LARGE_DECIMAL = { - LARGE_DECIMAL(dt.precision, dt.scale) - } -} - -private[sql] case class STRUCT(dataType: StructType) - extends ColumnType[UnsafeRow] with DirectCopyColumnType[UnsafeRow] { - - private val numOfFields: Int = dataType.fields.size - - override def defaultSize: Int = 20 - - override def setField(row: MutableRow, ordinal: Int, value: UnsafeRow): Unit = { - row.update(ordinal, value) - } - - override def getField(row: InternalRow, ordinal: Int): UnsafeRow = { - row.getStruct(ordinal, numOfFields).asInstanceOf[UnsafeRow] - } - - override def actualSize(row: InternalRow, ordinal: Int): Int = { - 4 + getField(row, ordinal).getSizeInBytes - } - - override def append(value: UnsafeRow, buffer: ByteBuffer): Unit = { - buffer.putInt(value.getSizeInBytes) - value.writeTo(buffer) - } - - override def extract(buffer: ByteBuffer): UnsafeRow = { - val sizeInBytes = ByteBufferHelper.getInt(buffer) - assert(buffer.hasArray) - val cursor = buffer.position() - buffer.position(cursor + sizeInBytes) - val unsafeRow = new UnsafeRow - unsafeRow.pointTo( - buffer.array(), - Platform.BYTE_ARRAY_OFFSET + buffer.arrayOffset() + cursor, - numOfFields, - sizeInBytes) - unsafeRow - } - - override def clone(v: UnsafeRow): UnsafeRow = v.copy() -} - -private[sql] case class ARRAY(dataType: ArrayType) - extends ColumnType[UnsafeArrayData] with DirectCopyColumnType[UnsafeArrayData] { - - override def defaultSize: Int = 16 - - override def setField(row: MutableRow, ordinal: Int, value: UnsafeArrayData): Unit = { - row.update(ordinal, value) - } - - override def getField(row: InternalRow, ordinal: Int): UnsafeArrayData = { - row.getArray(ordinal).asInstanceOf[UnsafeArrayData] - } - - override def actualSize(row: InternalRow, ordinal: Int): Int = { - val unsafeArray = getField(row, ordinal) - 4 + unsafeArray.getSizeInBytes - } - - override def append(value: UnsafeArrayData, buffer: ByteBuffer): Unit = { - buffer.putInt(value.getSizeInBytes) - value.writeTo(buffer) - } - - override def extract(buffer: ByteBuffer): UnsafeArrayData = { - val numBytes = buffer.getInt - assert(buffer.hasArray) - val cursor = buffer.position() - buffer.position(cursor + numBytes) - val array = new UnsafeArrayData - array.pointTo( - buffer.array(), - Platform.BYTE_ARRAY_OFFSET + buffer.arrayOffset() + cursor, - numBytes) - array - } - - override def clone(v: UnsafeArrayData): UnsafeArrayData = v.copy() -} - -private[sql] case class MAP(dataType: MapType) - extends ColumnType[UnsafeMapData] with DirectCopyColumnType[UnsafeMapData] { - - override def defaultSize: Int = 32 - - override def setField(row: MutableRow, ordinal: Int, value: UnsafeMapData): Unit = { - row.update(ordinal, value) - } - - override def getField(row: InternalRow, ordinal: Int): UnsafeMapData = { - row.getMap(ordinal).asInstanceOf[UnsafeMapData] - } - - override def actualSize(row: InternalRow, ordinal: Int): Int = { - val unsafeMap = getField(row, ordinal) - 4 + unsafeMap.getSizeInBytes - } - - override def append(value: UnsafeMapData, buffer: ByteBuffer): Unit = { - buffer.putInt(value.getSizeInBytes) - value.writeTo(buffer) - } - - override def extract(buffer: ByteBuffer): UnsafeMapData = { - val numBytes = buffer.getInt - val cursor = buffer.position() - buffer.position(cursor + numBytes) - val map = new UnsafeMapData - map.pointTo( - buffer.array(), - Platform.BYTE_ARRAY_OFFSET + buffer.arrayOffset() + cursor, - numBytes) - map - } - - override def clone(v: UnsafeMapData): UnsafeMapData = v.copy() -} - -private[sql] object ColumnType { - def apply(dataType: DataType): ColumnType[_] = { - dataType match { - case NullType => NULL - case BooleanType => BOOLEAN - case ByteType => BYTE - case ShortType => SHORT - case IntegerType | DateType => INT - case LongType | TimestampType => LONG - case FloatType => FLOAT - case DoubleType => DOUBLE - case StringType => STRING - case BinaryType => BINARY - case dt: DecimalType if dt.precision <= Decimal.MAX_LONG_DIGITS => COMPACT_DECIMAL(dt) - case dt: DecimalType => LARGE_DECIMAL(dt) - case arr: ArrayType => ARRAY(arr) - case map: MapType => MAP(map) - case struct: StructType => STRUCT(struct) - case udt: UserDefinedType[_] => apply(udt.sqlType) - case other => - throw new Exception(s"Unsupported type: $other") - } - } -} http://git-wip-us.apache.org/repos/asf/spark/blob/ea1a51fc/sql/core/src/main/scala/org/apache/spark/sql/columnar/GenerateColumnAccessor.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/columnar/GenerateColumnAccessor.scala b/sql/core/src/main/scala/org/apache/spark/sql/columnar/GenerateColumnAccessor.scala deleted file mode 100644 index ff9393b..0000000 --- a/sql/core/src/main/scala/org/apache/spark/sql/columnar/GenerateColumnAccessor.scala +++ /dev/null @@ -1,195 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.columnar - -import org.apache.spark.Logging -import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.catalyst.expressions._ -import org.apache.spark.sql.catalyst.expressions.codegen.{UnsafeRowWriter, CodeFormatter, CodeGenerator} -import org.apache.spark.sql.types._ - -/** - * An Iterator to walk through the InternalRows from a CachedBatch - */ -abstract class ColumnarIterator extends Iterator[InternalRow] { - def initialize(input: Iterator[CachedBatch], columnTypes: Array[DataType], - columnIndexes: Array[Int]): Unit -} - -/** - * An helper class to update the fields of UnsafeRow, used by ColumnAccessor - * - * WARNING: These setter MUST be called in increasing order of ordinals. - */ -class MutableUnsafeRow(val writer: UnsafeRowWriter) extends GenericMutableRow(null) { - - override def isNullAt(i: Int): Boolean = writer.isNullAt(i) - override def setNullAt(i: Int): Unit = writer.setNullAt(i) - - override def setBoolean(i: Int, v: Boolean): Unit = writer.write(i, v) - override def setByte(i: Int, v: Byte): Unit = writer.write(i, v) - override def setShort(i: Int, v: Short): Unit = writer.write(i, v) - override def setInt(i: Int, v: Int): Unit = writer.write(i, v) - override def setLong(i: Int, v: Long): Unit = writer.write(i, v) - override def setFloat(i: Int, v: Float): Unit = writer.write(i, v) - override def setDouble(i: Int, v: Double): Unit = writer.write(i, v) - - // the writer will be used directly to avoid creating wrapper objects - override def setDecimal(i: Int, v: Decimal, precision: Int): Unit = - throw new UnsupportedOperationException - override def update(i: Int, v: Any): Unit = throw new UnsupportedOperationException - - // all other methods inherited from GenericMutableRow are not need -} - -/** - * Generates bytecode for an [[ColumnarIterator]] for columnar cache. - */ -object GenerateColumnAccessor extends CodeGenerator[Seq[DataType], ColumnarIterator] with Logging { - - protected def canonicalize(in: Seq[DataType]): Seq[DataType] = in - protected def bind(in: Seq[DataType], inputSchema: Seq[Attribute]): Seq[DataType] = in - - protected def create(columnTypes: Seq[DataType]): ColumnarIterator = { - val ctx = newCodeGenContext() - val numFields = columnTypes.size - val (initializeAccessors, extractors) = columnTypes.zipWithIndex.map { case (dt, index) => - val accessorName = ctx.freshName("accessor") - val accessorCls = dt match { - case NullType => classOf[NullColumnAccessor].getName - case BooleanType => classOf[BooleanColumnAccessor].getName - case ByteType => classOf[ByteColumnAccessor].getName - case ShortType => classOf[ShortColumnAccessor].getName - case IntegerType | DateType => classOf[IntColumnAccessor].getName - case LongType | TimestampType => classOf[LongColumnAccessor].getName - case FloatType => classOf[FloatColumnAccessor].getName - case DoubleType => classOf[DoubleColumnAccessor].getName - case StringType => classOf[StringColumnAccessor].getName - case BinaryType => classOf[BinaryColumnAccessor].getName - case dt: DecimalType if dt.precision <= Decimal.MAX_LONG_DIGITS => - classOf[CompactDecimalColumnAccessor].getName - case dt: DecimalType => classOf[DecimalColumnAccessor].getName - case struct: StructType => classOf[StructColumnAccessor].getName - case array: ArrayType => classOf[ArrayColumnAccessor].getName - case t: MapType => classOf[MapColumnAccessor].getName - } - ctx.addMutableState(accessorCls, accessorName, s"$accessorName = null;") - - val createCode = dt match { - case t if ctx.isPrimitiveType(dt) => - s"$accessorName = new $accessorCls(ByteBuffer.wrap(buffers[$index]).order(nativeOrder));" - case NullType | StringType | BinaryType => - s"$accessorName = new $accessorCls(ByteBuffer.wrap(buffers[$index]).order(nativeOrder));" - case other => - s"""$accessorName = new $accessorCls(ByteBuffer.wrap(buffers[$index]).order(nativeOrder), - (${dt.getClass.getName}) columnTypes[$index]);""" - } - - val extract = s"$accessorName.extractTo(mutableRow, $index);" - val patch = dt match { - case DecimalType.Fixed(p, s) if p > Decimal.MAX_LONG_DIGITS => - // For large Decimal, it should have 16 bytes for future update even it's null now. - s""" - if (mutableRow.isNullAt($index)) { - rowWriter.write($index, (Decimal) null, $p, $s); - } - """ - case other => "" - } - (createCode, extract + patch) - }.unzip - - val code = s""" - import java.nio.ByteBuffer; - import java.nio.ByteOrder; - import scala.collection.Iterator; - import org.apache.spark.sql.types.DataType; - import org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder; - import org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter; - import org.apache.spark.sql.columnar.MutableUnsafeRow; - - public SpecificColumnarIterator generate($exprType[] expr) { - return new SpecificColumnarIterator(); - } - - class SpecificColumnarIterator extends ${classOf[ColumnarIterator].getName} { - - private ByteOrder nativeOrder = null; - private byte[][] buffers = null; - private UnsafeRow unsafeRow = new UnsafeRow(); - private BufferHolder bufferHolder = new BufferHolder(); - private UnsafeRowWriter rowWriter = new UnsafeRowWriter(); - private MutableUnsafeRow mutableRow = null; - - private int currentRow = 0; - private int numRowsInBatch = 0; - - private scala.collection.Iterator input = null; - private DataType[] columnTypes = null; - private int[] columnIndexes = null; - - ${declareMutableStates(ctx)} - - public SpecificColumnarIterator() { - this.nativeOrder = ByteOrder.nativeOrder(); - this.buffers = new byte[${columnTypes.length}][]; - this.mutableRow = new MutableUnsafeRow(rowWriter); - - ${initMutableStates(ctx)} - } - - public void initialize(Iterator input, DataType[] columnTypes, int[] columnIndexes) { - this.input = input; - this.columnTypes = columnTypes; - this.columnIndexes = columnIndexes; - } - - public boolean hasNext() { - if (currentRow < numRowsInBatch) { - return true; - } - if (!input.hasNext()) { - return false; - } - - ${classOf[CachedBatch].getName} batch = (${classOf[CachedBatch].getName}) input.next(); - currentRow = 0; - numRowsInBatch = batch.numRows(); - for (int i = 0; i < columnIndexes.length; i ++) { - buffers[i] = batch.buffers()[columnIndexes[i]]; - } - ${initializeAccessors.mkString("\n")} - - return hasNext(); - } - - public InternalRow next() { - currentRow += 1; - bufferHolder.reset(); - rowWriter.initialize(bufferHolder, $numFields); - ${extractors.mkString("\n")} - unsafeRow.pointTo(bufferHolder.buffer, $numFields, bufferHolder.totalSize()); - return unsafeRow; - } - }""" - - logDebug(s"Generated ColumnarIterator: ${CodeFormatter.format(code)}") - - compile(code).generate(ctx.references.toArray).asInstanceOf[ColumnarIterator] - } -} http://git-wip-us.apache.org/repos/asf/spark/blob/ea1a51fc/sql/core/src/main/scala/org/apache/spark/sql/columnar/InMemoryColumnarTableScan.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/columnar/InMemoryColumnarTableScan.scala b/sql/core/src/main/scala/org/apache/spark/sql/columnar/InMemoryColumnarTableScan.scala deleted file mode 100644 index ae77298..0000000 --- a/sql/core/src/main/scala/org/apache/spark/sql/columnar/InMemoryColumnarTableScan.scala +++ /dev/null @@ -1,345 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.columnar - -import scala.collection.mutable.ArrayBuffer - -import org.apache.spark.rdd.RDD -import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation -import org.apache.spark.sql.catalyst.dsl.expressions._ -import org.apache.spark.sql.catalyst.expressions._ -import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Statistics} -import org.apache.spark.sql.catalyst.plans.physical.Partitioning -import org.apache.spark.sql.execution.{ConvertToUnsafe, LeafNode, SparkPlan} -import org.apache.spark.sql.types.UserDefinedType -import org.apache.spark.storage.StorageLevel -import org.apache.spark.{Accumulable, Accumulator, Accumulators} - -private[sql] object InMemoryRelation { - def apply( - useCompression: Boolean, - batchSize: Int, - storageLevel: StorageLevel, - child: SparkPlan, - tableName: Option[String]): InMemoryRelation = - new InMemoryRelation(child.output, useCompression, batchSize, storageLevel, - if (child.outputsUnsafeRows) child else ConvertToUnsafe(child), - tableName)() -} - -/** - * CachedBatch is a cached batch of rows. - * - * @param numRows The total number of rows in this batch - * @param buffers The buffers for serialized columns - * @param stats The stat of columns - */ -private[sql] case class CachedBatch(numRows: Int, buffers: Array[Array[Byte]], stats: InternalRow) - -private[sql] case class InMemoryRelation( - output: Seq[Attribute], - useCompression: Boolean, - batchSize: Int, - storageLevel: StorageLevel, - @transient child: SparkPlan, - tableName: Option[String])( - @transient private var _cachedColumnBuffers: RDD[CachedBatch] = null, - @transient private var _statistics: Statistics = null, - private var _batchStats: Accumulable[ArrayBuffer[InternalRow], InternalRow] = null) - extends LogicalPlan with MultiInstanceRelation { - - private val batchStats: Accumulable[ArrayBuffer[InternalRow], InternalRow] = - if (_batchStats == null) { - child.sqlContext.sparkContext.accumulableCollection(ArrayBuffer.empty[InternalRow]) - } else { - _batchStats - } - - @transient val partitionStatistics = new PartitionStatistics(output) - - private def computeSizeInBytes = { - val sizeOfRow: Expression = - BindReferences.bindReference( - output.map(a => partitionStatistics.forAttribute(a).sizeInBytes).reduce(Add), - partitionStatistics.schema) - - batchStats.value.map(row => sizeOfRow.eval(row).asInstanceOf[Long]).sum - } - - // Statistics propagation contracts: - // 1. Non-null `_statistics` must reflect the actual statistics of the underlying data - // 2. Only propagate statistics when `_statistics` is non-null - private def statisticsToBePropagated = if (_statistics == null) { - val updatedStats = statistics - if (_statistics == null) null else updatedStats - } else { - _statistics - } - - override def statistics: Statistics = { - if (_statistics == null) { - if (batchStats.value.isEmpty) { - // Underlying columnar RDD hasn't been materialized, no useful statistics information - // available, return the default statistics. - Statistics(sizeInBytes = child.sqlContext.conf.defaultSizeInBytes) - } else { - // Underlying columnar RDD has been materialized, required information has also been - // collected via the `batchStats` accumulator, compute the final statistics, - // and update `_statistics`. - _statistics = Statistics(sizeInBytes = computeSizeInBytes) - _statistics - } - } else { - // Pre-computed statistics - _statistics - } - } - - // If the cached column buffers were not passed in, we calculate them in the constructor. - // As in Spark, the actual work of caching is lazy. - if (_cachedColumnBuffers == null) { - buildBuffers() - } - - def recache(): Unit = { - _cachedColumnBuffers.unpersist() - _cachedColumnBuffers = null - buildBuffers() - } - - private def buildBuffers(): Unit = { - val output = child.output - val cached = child.execute().mapPartitionsInternal { rowIterator => - new Iterator[CachedBatch] { - def next(): CachedBatch = { - val columnBuilders = output.map { attribute => - ColumnBuilder(attribute.dataType, batchSize, attribute.name, useCompression) - }.toArray - - var rowCount = 0 - var totalSize = 0L - while (rowIterator.hasNext && rowCount < batchSize - && totalSize < ColumnBuilder.MAX_BATCH_SIZE_IN_BYTE) { - val row = rowIterator.next() - - // Added for SPARK-6082. This assertion can be useful for scenarios when something - // like Hive TRANSFORM is used. The external data generation script used in TRANSFORM - // may result malformed rows, causing ArrayIndexOutOfBoundsException, which is somewhat - // hard to decipher. - assert( - row.numFields == columnBuilders.size, - s"Row column number mismatch, expected ${output.size} columns, " + - s"but got ${row.numFields}." + - s"\nRow content: $row") - - var i = 0 - totalSize = 0 - while (i < row.numFields) { - columnBuilders(i).appendFrom(row, i) - totalSize += columnBuilders(i).columnStats.sizeInBytes - i += 1 - } - rowCount += 1 - } - - val stats = InternalRow.fromSeq(columnBuilders.map(_.columnStats.collectedStatistics) - .flatMap(_.values)) - - batchStats += stats - CachedBatch(rowCount, columnBuilders.map(_.build().array()), stats) - } - - def hasNext: Boolean = rowIterator.hasNext - } - }.persist(storageLevel) - - cached.setName(tableName.map(n => s"In-memory table $n").getOrElse(child.toString)) - _cachedColumnBuffers = cached - } - - def withOutput(newOutput: Seq[Attribute]): InMemoryRelation = { - InMemoryRelation( - newOutput, useCompression, batchSize, storageLevel, child, tableName)( - _cachedColumnBuffers, statisticsToBePropagated, batchStats) - } - - override def children: Seq[LogicalPlan] = Seq.empty - - override def newInstance(): this.type = { - new InMemoryRelation( - output.map(_.newInstance()), - useCompression, - batchSize, - storageLevel, - child, - tableName)( - _cachedColumnBuffers, - statisticsToBePropagated, - batchStats).asInstanceOf[this.type] - } - - def cachedColumnBuffers: RDD[CachedBatch] = _cachedColumnBuffers - - override protected def otherCopyArgs: Seq[AnyRef] = - Seq(_cachedColumnBuffers, statisticsToBePropagated, batchStats) - - private[sql] def uncache(blocking: Boolean): Unit = { - Accumulators.remove(batchStats.id) - cachedColumnBuffers.unpersist(blocking) - _cachedColumnBuffers = null - } -} - -private[sql] case class InMemoryColumnarTableScan( - attributes: Seq[Attribute], - predicates: Seq[Expression], - @transient relation: InMemoryRelation) - extends LeafNode { - - override def output: Seq[Attribute] = attributes - - // The cached version does not change the outputPartitioning of the original SparkPlan. - override def outputPartitioning: Partitioning = relation.child.outputPartitioning - - // The cached version does not change the outputOrdering of the original SparkPlan. - override def outputOrdering: Seq[SortOrder] = relation.child.outputOrdering - - override def outputsUnsafeRows: Boolean = true - - private def statsFor(a: Attribute) = relation.partitionStatistics.forAttribute(a) - - // Returned filter predicate should return false iff it is impossible for the input expression - // to evaluate to `true' based on statistics collected about this partition batch. - @transient val buildFilter: PartialFunction[Expression, Expression] = { - case And(lhs: Expression, rhs: Expression) - if buildFilter.isDefinedAt(lhs) || buildFilter.isDefinedAt(rhs) => - (buildFilter.lift(lhs) ++ buildFilter.lift(rhs)).reduce(_ && _) - - case Or(lhs: Expression, rhs: Expression) - if buildFilter.isDefinedAt(lhs) && buildFilter.isDefinedAt(rhs) => - buildFilter(lhs) || buildFilter(rhs) - - case EqualTo(a: AttributeReference, l: Literal) => - statsFor(a).lowerBound <= l && l <= statsFor(a).upperBound - case EqualTo(l: Literal, a: AttributeReference) => - statsFor(a).lowerBound <= l && l <= statsFor(a).upperBound - - case LessThan(a: AttributeReference, l: Literal) => statsFor(a).lowerBound < l - case LessThan(l: Literal, a: AttributeReference) => l < statsFor(a).upperBound - - case LessThanOrEqual(a: AttributeReference, l: Literal) => statsFor(a).lowerBound <= l - case LessThanOrEqual(l: Literal, a: AttributeReference) => l <= statsFor(a).upperBound - - case GreaterThan(a: AttributeReference, l: Literal) => l < statsFor(a).upperBound - case GreaterThan(l: Literal, a: AttributeReference) => statsFor(a).lowerBound < l - - case GreaterThanOrEqual(a: AttributeReference, l: Literal) => l <= statsFor(a).upperBound - case GreaterThanOrEqual(l: Literal, a: AttributeReference) => statsFor(a).lowerBound <= l - - case IsNull(a: Attribute) => statsFor(a).nullCount > 0 - case IsNotNull(a: Attribute) => statsFor(a).count - statsFor(a).nullCount > 0 - } - - val partitionFilters: Seq[Expression] = { - predicates.flatMap { p => - val filter = buildFilter.lift(p) - val boundFilter = - filter.map( - BindReferences.bindReference( - _, - relation.partitionStatistics.schema, - allowFailures = true)) - - boundFilter.foreach(_ => - filter.foreach(f => logInfo(s"Predicate $p generates partition filter: $f"))) - - // If the filter can't be resolved then we are missing required statistics. - boundFilter.filter(_.resolved) - } - } - - lazy val enableAccumulators: Boolean = - sqlContext.getConf("spark.sql.inMemoryTableScanStatistics.enable", "false").toBoolean - - // Accumulators used for testing purposes - lazy val readPartitions: Accumulator[Int] = sparkContext.accumulator(0) - lazy val readBatches: Accumulator[Int] = sparkContext.accumulator(0) - - private val inMemoryPartitionPruningEnabled = sqlContext.conf.inMemoryPartitionPruning - - protected override def doExecute(): RDD[InternalRow] = { - if (enableAccumulators) { - readPartitions.setValue(0) - readBatches.setValue(0) - } - - // Using these variables here to avoid serialization of entire objects (if referenced directly) - // within the map Partitions closure. - val schema = relation.partitionStatistics.schema - val schemaIndex = schema.zipWithIndex - val relOutput = relation.output - val buffers = relation.cachedColumnBuffers - - buffers.mapPartitionsInternal { cachedBatchIterator => - val partitionFilter = newPredicate( - partitionFilters.reduceOption(And).getOrElse(Literal(true)), - schema) - - // Find the ordinals and data types of the requested columns. - val (requestedColumnIndices, requestedColumnDataTypes) = - attributes.map { a => - relOutput.indexWhere(_.exprId == a.exprId) -> a.dataType - }.unzip - - // Do partition batch pruning if enabled - val cachedBatchesToScan = - if (inMemoryPartitionPruningEnabled) { - cachedBatchIterator.filter { cachedBatch => - if (!partitionFilter(cachedBatch.stats)) { - def statsString: String = schemaIndex.map { - case (a, i) => - val value = cachedBatch.stats.get(i, a.dataType) - s"${a.name}: $value" - }.mkString(", ") - logInfo(s"Skipping partition based on stats $statsString") - false - } else { - if (enableAccumulators) { - readBatches += 1 - } - true - } - } - } else { - cachedBatchIterator - } - - val columnTypes = requestedColumnDataTypes.map { - case udt: UserDefinedType[_] => udt.sqlType - case other => other - }.toArray - val columnarIterator = GenerateColumnAccessor.generate(columnTypes) - columnarIterator.initialize(cachedBatchesToScan, columnTypes, requestedColumnIndices.toArray) - if (enableAccumulators && columnarIterator.hasNext) { - readPartitions += 1 - } - columnarIterator - } - } -} http://git-wip-us.apache.org/repos/asf/spark/blob/ea1a51fc/sql/core/src/main/scala/org/apache/spark/sql/columnar/NullableColumnAccessor.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/columnar/NullableColumnAccessor.scala b/sql/core/src/main/scala/org/apache/spark/sql/columnar/NullableColumnAccessor.scala deleted file mode 100644 index 7eaecfe..0000000 --- a/sql/core/src/main/scala/org/apache/spark/sql/columnar/NullableColumnAccessor.scala +++ /dev/null @@ -1,59 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.columnar - -import java.nio.{ByteOrder, ByteBuffer} - -import org.apache.spark.sql.catalyst.expressions.MutableRow - -private[sql] trait NullableColumnAccessor extends ColumnAccessor { - private var nullsBuffer: ByteBuffer = _ - private var nullCount: Int = _ - private var seenNulls: Int = 0 - - private var nextNullIndex: Int = _ - private var pos: Int = 0 - - abstract override protected def initialize(): Unit = { - nullsBuffer = underlyingBuffer.duplicate().order(ByteOrder.nativeOrder()) - nullCount = ByteBufferHelper.getInt(nullsBuffer) - nextNullIndex = if (nullCount > 0) ByteBufferHelper.getInt(nullsBuffer) else -1 - pos = 0 - - underlyingBuffer.position(underlyingBuffer.position + 4 + nullCount * 4) - super.initialize() - } - - abstract override def extractTo(row: MutableRow, ordinal: Int): Unit = { - if (pos == nextNullIndex) { - seenNulls += 1 - - if (seenNulls < nullCount) { - nextNullIndex = ByteBufferHelper.getInt(nullsBuffer) - } - - row.setNullAt(ordinal) - } else { - super.extractTo(row, ordinal) - } - - pos += 1 - } - - abstract override def hasNext: Boolean = seenNulls < nullCount || super.hasNext -} http://git-wip-us.apache.org/repos/asf/spark/blob/ea1a51fc/sql/core/src/main/scala/org/apache/spark/sql/columnar/NullableColumnBuilder.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/columnar/NullableColumnBuilder.scala b/sql/core/src/main/scala/org/apache/spark/sql/columnar/NullableColumnBuilder.scala deleted file mode 100644 index 76cfddf..0000000 --- a/sql/core/src/main/scala/org/apache/spark/sql/columnar/NullableColumnBuilder.scala +++ /dev/null @@ -1,88 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.columnar - -import java.nio.{ByteBuffer, ByteOrder} - -import org.apache.spark.sql.catalyst.InternalRow - -/** - * A stackable trait used for building byte buffer for a column containing null values. Memory - * layout of the final byte buffer is: - * {{{ - * .------------------- Null count N (4 bytes) - * | .--------------- Null positions (4 x N bytes, empty if null count is zero) - * | | .--------- Non-null elements - * V V V - * +---+-----+---------+ - * | | ... | ... ... | - * +---+-----+---------+ - * }}} - */ -private[sql] trait NullableColumnBuilder extends ColumnBuilder { - protected var nulls: ByteBuffer = _ - protected var nullCount: Int = _ - private var pos: Int = _ - - abstract override def initialize( - initialSize: Int, - columnName: String, - useCompression: Boolean): Unit = { - - nulls = ByteBuffer.allocate(1024) - nulls.order(ByteOrder.nativeOrder()) - pos = 0 - nullCount = 0 - super.initialize(initialSize, columnName, useCompression) - } - - abstract override def appendFrom(row: InternalRow, ordinal: Int): Unit = { - columnStats.gatherStats(row, ordinal) - if (row.isNullAt(ordinal)) { - nulls = ColumnBuilder.ensureFreeSpace(nulls, 4) - nulls.putInt(pos) - nullCount += 1 - } else { - super.appendFrom(row, ordinal) - } - pos += 1 - } - - abstract override def build(): ByteBuffer = { - val nonNulls = super.build() - val nullDataLen = nulls.position() - - nulls.limit(nullDataLen) - nulls.rewind() - - val buffer = ByteBuffer - .allocate(4 + nullDataLen + nonNulls.remaining()) - .order(ByteOrder.nativeOrder()) - .putInt(nullCount) - .put(nulls) - .put(nonNulls) - - buffer.rewind() - buffer - } - - protected def buildNonNulls(): ByteBuffer = { - nulls.limit(nulls.position()).rewind() - super.build() - } -} http://git-wip-us.apache.org/repos/asf/spark/blob/ea1a51fc/sql/core/src/main/scala/org/apache/spark/sql/columnar/compression/CompressibleColumnAccessor.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/columnar/compression/CompressibleColumnAccessor.scala b/sql/core/src/main/scala/org/apache/spark/sql/columnar/compression/CompressibleColumnAccessor.scala deleted file mode 100644 index cb205de..0000000 --- a/sql/core/src/main/scala/org/apache/spark/sql/columnar/compression/CompressibleColumnAccessor.scala +++ /dev/null @@ -1,39 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.columnar.compression - -import org.apache.spark.sql.catalyst.expressions.MutableRow -import org.apache.spark.sql.columnar.{ColumnAccessor, NativeColumnAccessor} -import org.apache.spark.sql.types.AtomicType - -private[sql] trait CompressibleColumnAccessor[T <: AtomicType] extends ColumnAccessor { - this: NativeColumnAccessor[T] => - - private var decoder: Decoder[T] = _ - - abstract override protected def initialize(): Unit = { - super.initialize() - decoder = CompressionScheme(underlyingBuffer.getInt()).decoder(buffer, columnType) - } - - abstract override def hasNext: Boolean = super.hasNext || decoder.hasNext - - override def extractSingle(row: MutableRow, ordinal: Int): Unit = { - decoder.next(row, ordinal) - } -} http://git-wip-us.apache.org/repos/asf/spark/blob/ea1a51fc/sql/core/src/main/scala/org/apache/spark/sql/columnar/compression/CompressibleColumnBuilder.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/columnar/compression/CompressibleColumnBuilder.scala b/sql/core/src/main/scala/org/apache/spark/sql/columnar/compression/CompressibleColumnBuilder.scala deleted file mode 100644 index 161021f..0000000 --- a/sql/core/src/main/scala/org/apache/spark/sql/columnar/compression/CompressibleColumnBuilder.scala +++ /dev/null @@ -1,109 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.columnar.compression - -import java.nio.{ByteBuffer, ByteOrder} - -import org.apache.spark.Logging -import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.columnar.{ColumnBuilder, NativeColumnBuilder} -import org.apache.spark.sql.types.AtomicType - -/** - * A stackable trait that builds optionally compressed byte buffer for a column. Memory layout of - * the final byte buffer is: - * {{{ - * .----------------------- Null count N (4 bytes) - * | .------------------- Null positions (4 x N bytes, empty if null count is zero) - * | | .------------- Compression scheme ID (4 bytes) - * | | | .--------- Compressed non-null elements - * V V V V - * +---+-----+---+---------+ - * | | ... | | ... ... | - * +---+-----+---+---------+ - * \-------/ \-----------/ - * header body - * }}} - */ -private[sql] trait CompressibleColumnBuilder[T <: AtomicType] - extends ColumnBuilder with Logging { - - this: NativeColumnBuilder[T] with WithCompressionSchemes => - - var compressionEncoders: Seq[Encoder[T]] = _ - - abstract override def initialize( - initialSize: Int, - columnName: String, - useCompression: Boolean): Unit = { - - compressionEncoders = - if (useCompression) { - schemes.filter(_.supports(columnType)).map(_.encoder[T](columnType)) - } else { - Seq(PassThrough.encoder(columnType)) - } - super.initialize(initialSize, columnName, useCompression) - } - - protected def isWorthCompressing(encoder: Encoder[T]) = { - encoder.compressionRatio < 0.8 - } - - private def gatherCompressibilityStats(row: InternalRow, ordinal: Int): Unit = { - var i = 0 - while (i < compressionEncoders.length) { - compressionEncoders(i).gatherCompressibilityStats(row, ordinal) - i += 1 - } - } - - abstract override def appendFrom(row: InternalRow, ordinal: Int): Unit = { - super.appendFrom(row, ordinal) - if (!row.isNullAt(ordinal)) { - gatherCompressibilityStats(row, ordinal) - } - } - - override def build(): ByteBuffer = { - val nonNullBuffer = buildNonNulls() - val encoder: Encoder[T] = { - val candidate = compressionEncoders.minBy(_.compressionRatio) - if (isWorthCompressing(candidate)) candidate else PassThrough.encoder(columnType) - } - - // Header = null count + null positions - val headerSize = 4 + nulls.limit() - val compressedSize = if (encoder.compressedSize == 0) { - nonNullBuffer.remaining() - } else { - encoder.compressedSize - } - - val compressedBuffer = ByteBuffer - // Reserves 4 bytes for compression scheme ID - .allocate(headerSize + 4 + compressedSize) - .order(ByteOrder.nativeOrder) - // Write the header - .putInt(nullCount) - .put(nulls) - - logDebug(s"Compressor for [$columnName]: $encoder, ratio: ${encoder.compressionRatio}") - encoder.compress(nonNullBuffer, compressedBuffer) - } -} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org For additional commands, e-mail: commits-help@spark.apache.org