spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From r...@apache.org
Subject [5/5] spark git commit: [SPARK-11858][SQL] Move sql.columnar into sql.execution.
Date Thu, 19 Nov 2015 22:48:26 GMT
[SPARK-11858][SQL] Move sql.columnar into sql.execution.

In addition, tightened visibility of a lot of classes in the columnar package from private[sql] to private[columnar].

Author: Reynold Xin <rxin@databricks.com>

Closes #9842 from rxin/SPARK-11858.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/014c0f7a
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/014c0f7a
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/014c0f7a

Branch: refs/heads/master
Commit: 014c0f7a9dfdb1686fa9aeacaadb2a17a855a943
Parents: 599a8c6
Author: Reynold Xin <rxin@databricks.com>
Authored: Thu Nov 19 14:48:18 2015 -0800
Committer: Reynold Xin <rxin@databricks.com>
Committed: Thu Nov 19 14:48:18 2015 -0800

----------------------------------------------------------------------
 .../spark/sql/columnar/ColumnAccessor.scala     | 148 ----
 .../spark/sql/columnar/ColumnBuilder.scala      | 189 -----
 .../apache/spark/sql/columnar/ColumnStats.scala | 271 --------
 .../apache/spark/sql/columnar/ColumnType.scala  | 689 -------------------
 .../sql/columnar/GenerateColumnAccessor.scala   | 195 ------
 .../columnar/InMemoryColumnarTableScan.scala    | 345 ----------
 .../sql/columnar/NullableColumnAccessor.scala   |  59 --
 .../sql/columnar/NullableColumnBuilder.scala    |  88 ---
 .../CompressibleColumnAccessor.scala            |  39 --
 .../compression/CompressibleColumnBuilder.scala | 109 ---
 .../compression/CompressionScheme.scala         |  81 ---
 .../compression/compressionSchemes.scala        | 532 --------------
 .../spark/sql/execution/CacheManager.scala      |   2 +-
 .../spark/sql/execution/SparkStrategies.scala   |   2 +-
 .../sql/execution/columnar/ColumnAccessor.scala | 148 ++++
 .../sql/execution/columnar/ColumnBuilder.scala  | 194 ++++++
 .../sql/execution/columnar/ColumnStats.scala    | 271 ++++++++
 .../sql/execution/columnar/ColumnType.scala     | 689 +++++++++++++++++++
 .../columnar/GenerateColumnAccessor.scala       | 195 ++++++
 .../columnar/InMemoryColumnarTableScan.scala    | 346 ++++++++++
 .../columnar/NullableColumnAccessor.scala       |  59 ++
 .../columnar/NullableColumnBuilder.scala        |  88 +++
 .../CompressibleColumnAccessor.scala            |  39 ++
 .../compression/CompressibleColumnBuilder.scala | 109 +++
 .../compression/CompressionScheme.scala         |  81 +++
 .../compression/compressionSchemes.scala        | 532 ++++++++++++++
 .../apache/spark/sql/execution/package.scala    |   2 +
 .../org/apache/spark/sql/CachedTableSuite.scala |   4 +-
 .../scala/org/apache/spark/sql/QueryTest.scala  |   2 +-
 .../spark/sql/columnar/ColumnStatsSuite.scala   | 110 ---
 .../spark/sql/columnar/ColumnTypeSuite.scala    | 145 ----
 .../spark/sql/columnar/ColumnarTestUtils.scala  | 108 ---
 .../columnar/InMemoryColumnarQuerySuite.scala   | 222 ------
 .../columnar/NullableColumnAccessorSuite.scala  |  92 ---
 .../columnar/NullableColumnBuilderSuite.scala   | 107 ---
 .../columnar/PartitionBatchPruningSuite.scala   | 127 ----
 .../compression/BooleanBitSetSuite.scala        | 107 ---
 .../compression/DictionaryEncodingSuite.scala   | 128 ----
 .../compression/IntegralDeltaSuite.scala        | 131 ----
 .../compression/RunLengthEncodingSuite.scala    | 114 ---
 .../TestCompressibleColumnBuilder.scala         |  44 --
 .../execution/columnar/ColumnStatsSuite.scala   | 110 +++
 .../execution/columnar/ColumnTypeSuite.scala    | 145 ++++
 .../execution/columnar/ColumnarTestUtils.scala  | 108 +++
 .../columnar/InMemoryColumnarQuerySuite.scala   | 222 ++++++
 .../columnar/NullableColumnAccessorSuite.scala  |  92 +++
 .../columnar/NullableColumnBuilderSuite.scala   | 107 +++
 .../columnar/PartitionBatchPruningSuite.scala   | 127 ++++
 .../compression/BooleanBitSetSuite.scala        | 107 +++
 .../compression/DictionaryEncodingSuite.scala   | 128 ++++
 .../compression/IntegralDeltaSuite.scala        | 131 ++++
 .../compression/RunLengthEncodingSuite.scala    | 114 +++
 .../TestCompressibleColumnBuilder.scala         |  44 ++
 .../spark/sql/hive/CachedTableSuite.scala       |   2 +-
 54 files changed, 4194 insertions(+), 4186 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/014c0f7a/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnAccessor.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnAccessor.scala b/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnAccessor.scala
deleted file mode 100644
index 42ec4d3..0000000
--- a/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnAccessor.scala
+++ /dev/null
@@ -1,148 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.columnar
-
-import java.nio.{ByteBuffer, ByteOrder}
-
-import org.apache.spark.sql.catalyst.expressions.{MutableRow, UnsafeArrayData, UnsafeMapData, UnsafeRow}
-import org.apache.spark.sql.columnar.compression.CompressibleColumnAccessor
-import org.apache.spark.sql.types._
-
-/**
- * An `Iterator` like trait used to extract values from columnar byte buffer. When a value is
- * extracted from the buffer, instead of directly returning it, the value is set into some field of
- * a [[MutableRow]]. In this way, boxing cost can be avoided by leveraging the setter methods
- * for primitive values provided by [[MutableRow]].
- */
-private[sql] trait ColumnAccessor {
-  initialize()
-
-  protected def initialize()
-
-  def hasNext: Boolean
-
-  def extractTo(row: MutableRow, ordinal: Int)
-
-  protected def underlyingBuffer: ByteBuffer
-}
-
-private[sql] abstract class BasicColumnAccessor[JvmType](
-    protected val buffer: ByteBuffer,
-    protected val columnType: ColumnType[JvmType])
-  extends ColumnAccessor {
-
-  protected def initialize() {}
-
-  override def hasNext: Boolean = buffer.hasRemaining
-
-  override def extractTo(row: MutableRow, ordinal: Int): Unit = {
-    extractSingle(row, ordinal)
-  }
-
-  def extractSingle(row: MutableRow, ordinal: Int): Unit = {
-    columnType.extract(buffer, row, ordinal)
-  }
-
-  protected def underlyingBuffer = buffer
-}
-
-private[sql] class NullColumnAccessor(buffer: ByteBuffer)
-  extends BasicColumnAccessor[Any](buffer, NULL)
-  with NullableColumnAccessor
-
-private[sql] abstract class NativeColumnAccessor[T <: AtomicType](
-    override protected val buffer: ByteBuffer,
-    override protected val columnType: NativeColumnType[T])
-  extends BasicColumnAccessor(buffer, columnType)
-  with NullableColumnAccessor
-  with CompressibleColumnAccessor[T]
-
-private[sql] class BooleanColumnAccessor(buffer: ByteBuffer)
-  extends NativeColumnAccessor(buffer, BOOLEAN)
-
-private[sql] class ByteColumnAccessor(buffer: ByteBuffer)
-  extends NativeColumnAccessor(buffer, BYTE)
-
-private[sql] class ShortColumnAccessor(buffer: ByteBuffer)
-  extends NativeColumnAccessor(buffer, SHORT)
-
-private[sql] class IntColumnAccessor(buffer: ByteBuffer)
-  extends NativeColumnAccessor(buffer, INT)
-
-private[sql] class LongColumnAccessor(buffer: ByteBuffer)
-  extends NativeColumnAccessor(buffer, LONG)
-
-private[sql] class FloatColumnAccessor(buffer: ByteBuffer)
-  extends NativeColumnAccessor(buffer, FLOAT)
-
-private[sql] class DoubleColumnAccessor(buffer: ByteBuffer)
-  extends NativeColumnAccessor(buffer, DOUBLE)
-
-private[sql] class StringColumnAccessor(buffer: ByteBuffer)
-  extends NativeColumnAccessor(buffer, STRING)
-
-private[sql] class BinaryColumnAccessor(buffer: ByteBuffer)
-  extends BasicColumnAccessor[Array[Byte]](buffer, BINARY)
-  with NullableColumnAccessor
-
-private[sql] class CompactDecimalColumnAccessor(buffer: ByteBuffer, dataType: DecimalType)
-  extends NativeColumnAccessor(buffer, COMPACT_DECIMAL(dataType))
-
-private[sql] class DecimalColumnAccessor(buffer: ByteBuffer, dataType: DecimalType)
-  extends BasicColumnAccessor[Decimal](buffer, LARGE_DECIMAL(dataType))
-  with NullableColumnAccessor
-
-private[sql] class StructColumnAccessor(buffer: ByteBuffer, dataType: StructType)
-  extends BasicColumnAccessor[UnsafeRow](buffer, STRUCT(dataType))
-  with NullableColumnAccessor
-
-private[sql] class ArrayColumnAccessor(buffer: ByteBuffer, dataType: ArrayType)
-  extends BasicColumnAccessor[UnsafeArrayData](buffer, ARRAY(dataType))
-  with NullableColumnAccessor
-
-private[sql] class MapColumnAccessor(buffer: ByteBuffer, dataType: MapType)
-  extends BasicColumnAccessor[UnsafeMapData](buffer, MAP(dataType))
-  with NullableColumnAccessor
-
-private[sql] object ColumnAccessor {
-  def apply(dataType: DataType, buffer: ByteBuffer): ColumnAccessor = {
-    val buf = buffer.order(ByteOrder.nativeOrder)
-
-    dataType match {
-      case NullType => new NullColumnAccessor(buf)
-      case BooleanType => new BooleanColumnAccessor(buf)
-      case ByteType => new ByteColumnAccessor(buf)
-      case ShortType => new ShortColumnAccessor(buf)
-      case IntegerType | DateType => new IntColumnAccessor(buf)
-      case LongType | TimestampType => new LongColumnAccessor(buf)
-      case FloatType => new FloatColumnAccessor(buf)
-      case DoubleType => new DoubleColumnAccessor(buf)
-      case StringType => new StringColumnAccessor(buf)
-      case BinaryType => new BinaryColumnAccessor(buf)
-      case dt: DecimalType if dt.precision <= Decimal.MAX_LONG_DIGITS =>
-        new CompactDecimalColumnAccessor(buf, dt)
-      case dt: DecimalType => new DecimalColumnAccessor(buf, dt)
-      case struct: StructType => new StructColumnAccessor(buf, struct)
-      case array: ArrayType => new ArrayColumnAccessor(buf, array)
-      case map: MapType => new MapColumnAccessor(buf, map)
-      case udt: UserDefinedType[_] => ColumnAccessor(udt.sqlType, buffer)
-      case other =>
-        throw new Exception(s"not support type: $other")
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/014c0f7a/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnBuilder.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnBuilder.scala b/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnBuilder.scala
deleted file mode 100644
index 599f30f..0000000
--- a/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnBuilder.scala
+++ /dev/null
@@ -1,189 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.columnar
-
-import java.nio.{ByteBuffer, ByteOrder}
-
-import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.columnar.ColumnBuilder._
-import org.apache.spark.sql.columnar.compression.{AllCompressionSchemes, CompressibleColumnBuilder}
-import org.apache.spark.sql.types._
-
-private[sql] trait ColumnBuilder {
-  /**
-   * Initializes with an approximate lower bound on the expected number of elements in this column.
-   */
-  def initialize(initialSize: Int, columnName: String = "", useCompression: Boolean = false)
-
-  /**
-   * Appends `row(ordinal)` to the column builder.
-   */
-  def appendFrom(row: InternalRow, ordinal: Int)
-
-  /**
-   * Column statistics information
-   */
-  def columnStats: ColumnStats
-
-  /**
-   * Returns the final columnar byte buffer.
-   */
-  def build(): ByteBuffer
-}
-
-private[sql] class BasicColumnBuilder[JvmType](
-    val columnStats: ColumnStats,
-    val columnType: ColumnType[JvmType])
-  extends ColumnBuilder {
-
-  protected var columnName: String = _
-
-  protected var buffer: ByteBuffer = _
-
-  override def initialize(
-      initialSize: Int,
-      columnName: String = "",
-      useCompression: Boolean = false): Unit = {
-
-    val size = if (initialSize == 0) DEFAULT_INITIAL_BUFFER_SIZE else initialSize
-    this.columnName = columnName
-
-    buffer = ByteBuffer.allocate(size * columnType.defaultSize)
-    buffer.order(ByteOrder.nativeOrder())
-  }
-
-  override def appendFrom(row: InternalRow, ordinal: Int): Unit = {
-    buffer = ensureFreeSpace(buffer, columnType.actualSize(row, ordinal))
-    columnType.append(row, ordinal, buffer)
-  }
-
-  override def build(): ByteBuffer = {
-    if (buffer.capacity() > buffer.position() * 1.1) {
-      // trim the buffer
-      buffer = ByteBuffer
-        .allocate(buffer.position())
-        .order(ByteOrder.nativeOrder())
-        .put(buffer.array(), 0, buffer.position())
-    }
-    buffer.flip().asInstanceOf[ByteBuffer]
-  }
-}
-
-private[sql] class NullColumnBuilder
-  extends BasicColumnBuilder[Any](new ObjectColumnStats(NullType), NULL)
-  with NullableColumnBuilder
-
-private[sql] abstract class ComplexColumnBuilder[JvmType](
-    columnStats: ColumnStats,
-    columnType: ColumnType[JvmType])
-  extends BasicColumnBuilder[JvmType](columnStats, columnType)
-  with NullableColumnBuilder
-
-private[sql] abstract class NativeColumnBuilder[T <: AtomicType](
-    override val columnStats: ColumnStats,
-    override val columnType: NativeColumnType[T])
-  extends BasicColumnBuilder[T#InternalType](columnStats, columnType)
-  with NullableColumnBuilder
-  with AllCompressionSchemes
-  with CompressibleColumnBuilder[T]
-
-private[sql] class BooleanColumnBuilder extends NativeColumnBuilder(new BooleanColumnStats, BOOLEAN)
-
-private[sql] class ByteColumnBuilder extends NativeColumnBuilder(new ByteColumnStats, BYTE)
-
-private[sql] class ShortColumnBuilder extends NativeColumnBuilder(new ShortColumnStats, SHORT)
-
-private[sql] class IntColumnBuilder extends NativeColumnBuilder(new IntColumnStats, INT)
-
-private[sql] class LongColumnBuilder extends NativeColumnBuilder(new LongColumnStats, LONG)
-
-private[sql] class FloatColumnBuilder extends NativeColumnBuilder(new FloatColumnStats, FLOAT)
-
-private[sql] class DoubleColumnBuilder extends NativeColumnBuilder(new DoubleColumnStats, DOUBLE)
-
-private[sql] class StringColumnBuilder extends NativeColumnBuilder(new StringColumnStats, STRING)
-
-private[sql] class BinaryColumnBuilder extends ComplexColumnBuilder(new BinaryColumnStats, BINARY)
-
-private[sql] class CompactDecimalColumnBuilder(dataType: DecimalType)
-  extends NativeColumnBuilder(new DecimalColumnStats(dataType), COMPACT_DECIMAL(dataType))
-
-private[sql] class DecimalColumnBuilder(dataType: DecimalType)
-  extends ComplexColumnBuilder(new DecimalColumnStats(dataType), LARGE_DECIMAL(dataType))
-
-private[sql] class StructColumnBuilder(dataType: StructType)
-  extends ComplexColumnBuilder(new ObjectColumnStats(dataType), STRUCT(dataType))
-
-private[sql] class ArrayColumnBuilder(dataType: ArrayType)
-  extends ComplexColumnBuilder(new ObjectColumnStats(dataType), ARRAY(dataType))
-
-private[sql] class MapColumnBuilder(dataType: MapType)
-  extends ComplexColumnBuilder(new ObjectColumnStats(dataType), MAP(dataType))
-
-private[sql] object ColumnBuilder {
-  val DEFAULT_INITIAL_BUFFER_SIZE = 128 * 1024
-  val MAX_BATCH_SIZE_IN_BYTE = 4 * 1024 * 1024L
-
-  private[columnar] def ensureFreeSpace(orig: ByteBuffer, size: Int) = {
-    if (orig.remaining >= size) {
-      orig
-    } else {
-      // grow in steps of initial size
-      val capacity = orig.capacity()
-      val newSize = capacity + size.max(capacity)
-      val pos = orig.position()
-
-      ByteBuffer
-        .allocate(newSize)
-        .order(ByteOrder.nativeOrder())
-        .put(orig.array(), 0, pos)
-    }
-  }
-
-  def apply(
-      dataType: DataType,
-      initialSize: Int = 0,
-      columnName: String = "",
-      useCompression: Boolean = false): ColumnBuilder = {
-    val builder: ColumnBuilder = dataType match {
-      case NullType => new NullColumnBuilder
-      case BooleanType => new BooleanColumnBuilder
-      case ByteType => new ByteColumnBuilder
-      case ShortType => new ShortColumnBuilder
-      case IntegerType | DateType => new IntColumnBuilder
-      case LongType | TimestampType => new LongColumnBuilder
-      case FloatType => new FloatColumnBuilder
-      case DoubleType => new DoubleColumnBuilder
-      case StringType => new StringColumnBuilder
-      case BinaryType => new BinaryColumnBuilder
-      case dt: DecimalType if dt.precision <= Decimal.MAX_LONG_DIGITS =>
-        new CompactDecimalColumnBuilder(dt)
-      case dt: DecimalType => new DecimalColumnBuilder(dt)
-      case struct: StructType => new StructColumnBuilder(struct)
-      case array: ArrayType => new ArrayColumnBuilder(array)
-      case map: MapType => new MapColumnBuilder(map)
-      case udt: UserDefinedType[_] =>
-        return apply(udt.sqlType, initialSize, columnName, useCompression)
-      case other =>
-        throw new Exception(s"not suppported type: $other")
-    }
-
-    builder.initialize(initialSize, columnName, useCompression)
-    builder
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/014c0f7a/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnStats.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnStats.scala b/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnStats.scala
deleted file mode 100644
index 91a0565..0000000
--- a/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnStats.scala
+++ /dev/null
@@ -1,271 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.columnar
-
-import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.expressions.{GenericInternalRow, Attribute, AttributeMap, AttributeReference}
-import org.apache.spark.sql.types._
-import org.apache.spark.unsafe.types.UTF8String
-
-private[sql] class ColumnStatisticsSchema(a: Attribute) extends Serializable {
-  val upperBound = AttributeReference(a.name + ".upperBound", a.dataType, nullable = true)()
-  val lowerBound = AttributeReference(a.name + ".lowerBound", a.dataType, nullable = true)()
-  val nullCount = AttributeReference(a.name + ".nullCount", IntegerType, nullable = false)()
-  val count = AttributeReference(a.name + ".count", IntegerType, nullable = false)()
-  val sizeInBytes = AttributeReference(a.name + ".sizeInBytes", LongType, nullable = false)()
-
-  val schema = Seq(lowerBound, upperBound, nullCount, count, sizeInBytes)
-}
-
-private[sql] class PartitionStatistics(tableSchema: Seq[Attribute]) extends Serializable {
-  val (forAttribute, schema) = {
-    val allStats = tableSchema.map(a => a -> new ColumnStatisticsSchema(a))
-    (AttributeMap(allStats), allStats.map(_._2.schema).foldLeft(Seq.empty[Attribute])(_ ++ _))
-  }
-}
-
-/**
- * Used to collect statistical information when building in-memory columns.
- *
- * NOTE: we intentionally avoid using `Ordering[T]` to compare values here because `Ordering[T]`
- * brings significant performance penalty.
- */
-private[sql] sealed trait ColumnStats extends Serializable {
-  protected var count = 0
-  protected var nullCount = 0
-  private[sql] var sizeInBytes = 0L
-
-  /**
-   * Gathers statistics information from `row(ordinal)`.
-   */
-  def gatherStats(row: InternalRow, ordinal: Int): Unit = {
-    if (row.isNullAt(ordinal)) {
-      nullCount += 1
-      // 4 bytes for null position
-      sizeInBytes += 4
-    }
-    count += 1
-  }
-
-  /**
-   * Column statistics represented as a single row, currently including closed lower bound, closed
-   * upper bound and null count.
-   */
-  def collectedStatistics: GenericInternalRow
-}
-
-/**
- * A no-op ColumnStats only used for testing purposes.
- */
-private[sql] class NoopColumnStats extends ColumnStats {
-  override def gatherStats(row: InternalRow, ordinal: Int): Unit = super.gatherStats(row, ordinal)
-
-  override def collectedStatistics: GenericInternalRow =
-    new GenericInternalRow(Array[Any](null, null, nullCount, count, 0L))
-}
-
-private[sql] class BooleanColumnStats extends ColumnStats {
-  protected var upper = false
-  protected var lower = true
-
-  override def gatherStats(row: InternalRow, ordinal: Int): Unit = {
-    super.gatherStats(row, ordinal)
-    if (!row.isNullAt(ordinal)) {
-      val value = row.getBoolean(ordinal)
-      if (value > upper) upper = value
-      if (value < lower) lower = value
-      sizeInBytes += BOOLEAN.defaultSize
-    }
-  }
-
-  override def collectedStatistics: GenericInternalRow =
-    new GenericInternalRow(Array[Any](lower, upper, nullCount, count, sizeInBytes))
-}
-
-private[sql] class ByteColumnStats extends ColumnStats {
-  protected var upper = Byte.MinValue
-  protected var lower = Byte.MaxValue
-
-  override def gatherStats(row: InternalRow, ordinal: Int): Unit = {
-    super.gatherStats(row, ordinal)
-    if (!row.isNullAt(ordinal)) {
-      val value = row.getByte(ordinal)
-      if (value > upper) upper = value
-      if (value < lower) lower = value
-      sizeInBytes += BYTE.defaultSize
-    }
-  }
-
-  override def collectedStatistics: GenericInternalRow =
-    new GenericInternalRow(Array[Any](lower, upper, nullCount, count, sizeInBytes))
-}
-
-private[sql] class ShortColumnStats extends ColumnStats {
-  protected var upper = Short.MinValue
-  protected var lower = Short.MaxValue
-
-  override def gatherStats(row: InternalRow, ordinal: Int): Unit = {
-    super.gatherStats(row, ordinal)
-    if (!row.isNullAt(ordinal)) {
-      val value = row.getShort(ordinal)
-      if (value > upper) upper = value
-      if (value < lower) lower = value
-      sizeInBytes += SHORT.defaultSize
-    }
-  }
-
-  override def collectedStatistics: GenericInternalRow =
-    new GenericInternalRow(Array[Any](lower, upper, nullCount, count, sizeInBytes))
-}
-
-private[sql] class IntColumnStats extends ColumnStats {
-  protected var upper = Int.MinValue
-  protected var lower = Int.MaxValue
-
-  override def gatherStats(row: InternalRow, ordinal: Int): Unit = {
-    super.gatherStats(row, ordinal)
-    if (!row.isNullAt(ordinal)) {
-      val value = row.getInt(ordinal)
-      if (value > upper) upper = value
-      if (value < lower) lower = value
-      sizeInBytes += INT.defaultSize
-    }
-  }
-
-  override def collectedStatistics: GenericInternalRow =
-    new GenericInternalRow(Array[Any](lower, upper, nullCount, count, sizeInBytes))
-}
-
-private[sql] class LongColumnStats extends ColumnStats {
-  protected var upper = Long.MinValue
-  protected var lower = Long.MaxValue
-
-  override def gatherStats(row: InternalRow, ordinal: Int): Unit = {
-    super.gatherStats(row, ordinal)
-    if (!row.isNullAt(ordinal)) {
-      val value = row.getLong(ordinal)
-      if (value > upper) upper = value
-      if (value < lower) lower = value
-      sizeInBytes += LONG.defaultSize
-    }
-  }
-
-  override def collectedStatistics: GenericInternalRow =
-    new GenericInternalRow(Array[Any](lower, upper, nullCount, count, sizeInBytes))
-}
-
-private[sql] class FloatColumnStats extends ColumnStats {
-  protected var upper = Float.MinValue
-  protected var lower = Float.MaxValue
-
-  override def gatherStats(row: InternalRow, ordinal: Int): Unit = {
-    super.gatherStats(row, ordinal)
-    if (!row.isNullAt(ordinal)) {
-      val value = row.getFloat(ordinal)
-      if (value > upper) upper = value
-      if (value < lower) lower = value
-      sizeInBytes += FLOAT.defaultSize
-    }
-  }
-
-  override def collectedStatistics: GenericInternalRow =
-    new GenericInternalRow(Array[Any](lower, upper, nullCount, count, sizeInBytes))
-}
-
-private[sql] class DoubleColumnStats extends ColumnStats {
-  protected var upper = Double.MinValue
-  protected var lower = Double.MaxValue
-
-  override def gatherStats(row: InternalRow, ordinal: Int): Unit = {
-    super.gatherStats(row, ordinal)
-    if (!row.isNullAt(ordinal)) {
-      val value = row.getDouble(ordinal)
-      if (value > upper) upper = value
-      if (value < lower) lower = value
-      sizeInBytes += DOUBLE.defaultSize
-    }
-  }
-
-  override def collectedStatistics: GenericInternalRow =
-    new GenericInternalRow(Array[Any](lower, upper, nullCount, count, sizeInBytes))
-}
-
-private[sql] class StringColumnStats extends ColumnStats {
-  protected var upper: UTF8String = null
-  protected var lower: UTF8String = null
-
-  override def gatherStats(row: InternalRow, ordinal: Int): Unit = {
-    super.gatherStats(row, ordinal)
-    if (!row.isNullAt(ordinal)) {
-      val value = row.getUTF8String(ordinal)
-      if (upper == null || value.compareTo(upper) > 0) upper = value.clone()
-      if (lower == null || value.compareTo(lower) < 0) lower = value.clone()
-      sizeInBytes += STRING.actualSize(row, ordinal)
-    }
-  }
-
-  override def collectedStatistics: GenericInternalRow =
-    new GenericInternalRow(Array[Any](lower, upper, nullCount, count, sizeInBytes))
-}
-
-private[sql] class BinaryColumnStats extends ColumnStats {
-  override def gatherStats(row: InternalRow, ordinal: Int): Unit = {
-    super.gatherStats(row, ordinal)
-    if (!row.isNullAt(ordinal)) {
-      sizeInBytes += BINARY.actualSize(row, ordinal)
-    }
-  }
-
-  override def collectedStatistics: GenericInternalRow =
-    new GenericInternalRow(Array[Any](null, null, nullCount, count, sizeInBytes))
-}
-
-private[sql] class DecimalColumnStats(precision: Int, scale: Int) extends ColumnStats {
-  def this(dt: DecimalType) = this(dt.precision, dt.scale)
-
-  protected var upper: Decimal = null
-  protected var lower: Decimal = null
-
-  override def gatherStats(row: InternalRow, ordinal: Int): Unit = {
-    super.gatherStats(row, ordinal)
-    if (!row.isNullAt(ordinal)) {
-      val value = row.getDecimal(ordinal, precision, scale)
-      if (upper == null || value.compareTo(upper) > 0) upper = value
-      if (lower == null || value.compareTo(lower) < 0) lower = value
-      // TODO: this is not right for DecimalType with precision > 18
-      sizeInBytes += 8
-    }
-  }
-
-  override def collectedStatistics: GenericInternalRow =
-    new GenericInternalRow(Array[Any](lower, upper, nullCount, count, sizeInBytes))
-}
-
-private[sql] class ObjectColumnStats(dataType: DataType) extends ColumnStats {
-  val columnType = ColumnType(dataType)
-
-  override def gatherStats(row: InternalRow, ordinal: Int): Unit = {
-    super.gatherStats(row, ordinal)
-    if (!row.isNullAt(ordinal)) {
-      sizeInBytes += columnType.actualSize(row, ordinal)
-    }
-  }
-
-  override def collectedStatistics: GenericInternalRow =
-    new GenericInternalRow(Array[Any](null, null, nullCount, count, sizeInBytes))
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/014c0f7a/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnType.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnType.scala b/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnType.scala
deleted file mode 100644
index 68e509e..0000000
--- a/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnType.scala
+++ /dev/null
@@ -1,689 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.columnar
-
-import java.math.{BigDecimal, BigInteger}
-import java.nio.ByteBuffer
-
-import scala.reflect.runtime.universe.TypeTag
-
-import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.expressions._
-import org.apache.spark.sql.types._
-import org.apache.spark.unsafe.Platform
-import org.apache.spark.unsafe.types.UTF8String
-
-
-/**
- * A help class for fast reading Int/Long/Float/Double from ByteBuffer in native order.
- *
- * Note: There is not much difference between ByteBuffer.getByte/getShort and
- * Unsafe.getByte/getShort, so we do not have helper methods for them.
- *
- * The unrolling (building columnar cache) is already slow, putLong/putDouble will not help much,
- * so we do not have helper methods for them.
- *
- *
- * WARNNING: This only works with HeapByteBuffer
- */
-object ByteBufferHelper {
-  def getInt(buffer: ByteBuffer): Int = {
-    val pos = buffer.position()
-    buffer.position(pos + 4)
-    Platform.getInt(buffer.array(), Platform.BYTE_ARRAY_OFFSET + pos)
-  }
-
-  def getLong(buffer: ByteBuffer): Long = {
-    val pos = buffer.position()
-    buffer.position(pos + 8)
-    Platform.getLong(buffer.array(), Platform.BYTE_ARRAY_OFFSET + pos)
-  }
-
-  def getFloat(buffer: ByteBuffer): Float = {
-    val pos = buffer.position()
-    buffer.position(pos + 4)
-    Platform.getFloat(buffer.array(), Platform.BYTE_ARRAY_OFFSET + pos)
-  }
-
-  def getDouble(buffer: ByteBuffer): Double = {
-    val pos = buffer.position()
-    buffer.position(pos + 8)
-    Platform.getDouble(buffer.array(), Platform.BYTE_ARRAY_OFFSET + pos)
-  }
-}
-
-/**
- * An abstract class that represents type of a column. Used to append/extract Java objects into/from
- * the underlying [[ByteBuffer]] of a column.
- *
- * @tparam JvmType Underlying Java type to represent the elements.
- */
-private[sql] sealed abstract class ColumnType[JvmType] {
-
-  // The catalyst data type of this column.
-  def dataType: DataType
-
-  // Default size in bytes for one element of type T (e.g. 4 for `Int`).
-  def defaultSize: Int
-
-  /**
-   * Extracts a value out of the buffer at the buffer's current position.
-   */
-  def extract(buffer: ByteBuffer): JvmType
-
-  /**
-   * Extracts a value out of the buffer at the buffer's current position and stores in
-   * `row(ordinal)`. Subclasses should override this method to avoid boxing/unboxing costs whenever
-   * possible.
-   */
-  def extract(buffer: ByteBuffer, row: MutableRow, ordinal: Int): Unit = {
-    setField(row, ordinal, extract(buffer))
-  }
-
-  /**
-   * Appends the given value v of type T into the given ByteBuffer.
-   */
-  def append(v: JvmType, buffer: ByteBuffer): Unit
-
-  /**
-   * Appends `row(ordinal)` of type T into the given ByteBuffer. Subclasses should override this
-   * method to avoid boxing/unboxing costs whenever possible.
-   */
-  def append(row: InternalRow, ordinal: Int, buffer: ByteBuffer): Unit = {
-    append(getField(row, ordinal), buffer)
-  }
-
-  /**
-   * Returns the size of the value `row(ordinal)`. This is used to calculate the size of variable
-   * length types such as byte arrays and strings.
-   */
-  def actualSize(row: InternalRow, ordinal: Int): Int = defaultSize
-
-  /**
-   * Returns `row(ordinal)`. Subclasses should override this method to avoid boxing/unboxing costs
-   * whenever possible.
-   */
-  def getField(row: InternalRow, ordinal: Int): JvmType
-
-  /**
-   * Sets `row(ordinal)` to `field`. Subclasses should override this method to avoid boxing/unboxing
-   * costs whenever possible.
-   */
-  def setField(row: MutableRow, ordinal: Int, value: JvmType): Unit
-
-  /**
-   * Copies `from(fromOrdinal)` to `to(toOrdinal)`. Subclasses should override this method to avoid
-   * boxing/unboxing costs whenever possible.
-   */
-  def copyField(from: InternalRow, fromOrdinal: Int, to: MutableRow, toOrdinal: Int): Unit = {
-    setField(to, toOrdinal, getField(from, fromOrdinal))
-  }
-
-  /**
-   * Creates a duplicated copy of the value.
-   */
-  def clone(v: JvmType): JvmType = v
-
-  override def toString: String = getClass.getSimpleName.stripSuffix("$")
-}
-
-private[sql] object NULL extends ColumnType[Any] {
-
-  override def dataType: DataType = NullType
-  override def defaultSize: Int = 0
-  override def append(v: Any, buffer: ByteBuffer): Unit = {}
-  override def extract(buffer: ByteBuffer): Any = null
-  override def setField(row: MutableRow, ordinal: Int, value: Any): Unit = row.setNullAt(ordinal)
-  override def getField(row: InternalRow, ordinal: Int): Any = null
-}
-
-private[sql] abstract class NativeColumnType[T <: AtomicType](
-    val dataType: T,
-    val defaultSize: Int)
-  extends ColumnType[T#InternalType] {
-
-  /**
-   * Scala TypeTag. Can be used to create primitive arrays and hash tables.
-   */
-  def scalaTag: TypeTag[dataType.InternalType] = dataType.tag
-}
-
-private[sql] object INT extends NativeColumnType(IntegerType, 4) {
-  override def append(v: Int, buffer: ByteBuffer): Unit = {
-    buffer.putInt(v)
-  }
-
-  override def append(row: InternalRow, ordinal: Int, buffer: ByteBuffer): Unit = {
-    buffer.putInt(row.getInt(ordinal))
-  }
-
-  override def extract(buffer: ByteBuffer): Int = {
-    ByteBufferHelper.getInt(buffer)
-  }
-
-  override def extract(buffer: ByteBuffer, row: MutableRow, ordinal: Int): Unit = {
-    row.setInt(ordinal, ByteBufferHelper.getInt(buffer))
-  }
-
-  override def setField(row: MutableRow, ordinal: Int, value: Int): Unit = {
-    row.setInt(ordinal, value)
-  }
-
-  override def getField(row: InternalRow, ordinal: Int): Int = row.getInt(ordinal)
-
-
-  override def copyField(from: InternalRow, fromOrdinal: Int, to: MutableRow, toOrdinal: Int) {
-    to.setInt(toOrdinal, from.getInt(fromOrdinal))
-  }
-}
-
-private[sql] object LONG extends NativeColumnType(LongType, 8) {
-  override def append(v: Long, buffer: ByteBuffer): Unit = {
-    buffer.putLong(v)
-  }
-
-  override def append(row: InternalRow, ordinal: Int, buffer: ByteBuffer): Unit = {
-    buffer.putLong(row.getLong(ordinal))
-  }
-
-  override def extract(buffer: ByteBuffer): Long = {
-    ByteBufferHelper.getLong(buffer)
-  }
-
-  override def extract(buffer: ByteBuffer, row: MutableRow, ordinal: Int): Unit = {
-    row.setLong(ordinal, ByteBufferHelper.getLong(buffer))
-  }
-
-  override def setField(row: MutableRow, ordinal: Int, value: Long): Unit = {
-    row.setLong(ordinal, value)
-  }
-
-  override def getField(row: InternalRow, ordinal: Int): Long = row.getLong(ordinal)
-
-  override def copyField(from: InternalRow, fromOrdinal: Int, to: MutableRow, toOrdinal: Int) {
-    to.setLong(toOrdinal, from.getLong(fromOrdinal))
-  }
-}
-
-private[sql] object FLOAT extends NativeColumnType(FloatType, 4) {
-  override def append(v: Float, buffer: ByteBuffer): Unit = {
-    buffer.putFloat(v)
-  }
-
-  override def append(row: InternalRow, ordinal: Int, buffer: ByteBuffer): Unit = {
-    buffer.putFloat(row.getFloat(ordinal))
-  }
-
-  override def extract(buffer: ByteBuffer): Float = {
-    ByteBufferHelper.getFloat(buffer)
-  }
-
-  override def extract(buffer: ByteBuffer, row: MutableRow, ordinal: Int): Unit = {
-    row.setFloat(ordinal, ByteBufferHelper.getFloat(buffer))
-  }
-
-  override def setField(row: MutableRow, ordinal: Int, value: Float): Unit = {
-    row.setFloat(ordinal, value)
-  }
-
-  override def getField(row: InternalRow, ordinal: Int): Float = row.getFloat(ordinal)
-
-  override def copyField(from: InternalRow, fromOrdinal: Int, to: MutableRow, toOrdinal: Int) {
-    to.setFloat(toOrdinal, from.getFloat(fromOrdinal))
-  }
-}
-
-private[sql] object DOUBLE extends NativeColumnType(DoubleType, 8) {
-  override def append(v: Double, buffer: ByteBuffer): Unit = {
-    buffer.putDouble(v)
-  }
-
-  override def append(row: InternalRow, ordinal: Int, buffer: ByteBuffer): Unit = {
-    buffer.putDouble(row.getDouble(ordinal))
-  }
-
-  override def extract(buffer: ByteBuffer): Double = {
-    ByteBufferHelper.getDouble(buffer)
-  }
-
-  override def extract(buffer: ByteBuffer, row: MutableRow, ordinal: Int): Unit = {
-    row.setDouble(ordinal, ByteBufferHelper.getDouble(buffer))
-  }
-
-  override def setField(row: MutableRow, ordinal: Int, value: Double): Unit = {
-    row.setDouble(ordinal, value)
-  }
-
-  override def getField(row: InternalRow, ordinal: Int): Double = row.getDouble(ordinal)
-
-  override def copyField(from: InternalRow, fromOrdinal: Int, to: MutableRow, toOrdinal: Int) {
-    to.setDouble(toOrdinal, from.getDouble(fromOrdinal))
-  }
-}
-
-private[sql] object BOOLEAN extends NativeColumnType(BooleanType, 1) {
-  override def append(v: Boolean, buffer: ByteBuffer): Unit = {
-    buffer.put(if (v) 1: Byte else 0: Byte)
-  }
-
-  override def append(row: InternalRow, ordinal: Int, buffer: ByteBuffer): Unit = {
-    buffer.put(if (row.getBoolean(ordinal)) 1: Byte else 0: Byte)
-  }
-
-  override def extract(buffer: ByteBuffer): Boolean = buffer.get() == 1
-
-  override def extract(buffer: ByteBuffer, row: MutableRow, ordinal: Int): Unit = {
-    row.setBoolean(ordinal, buffer.get() == 1)
-  }
-
-  override def setField(row: MutableRow, ordinal: Int, value: Boolean): Unit = {
-    row.setBoolean(ordinal, value)
-  }
-
-  override def getField(row: InternalRow, ordinal: Int): Boolean = row.getBoolean(ordinal)
-
-  override def copyField(from: InternalRow, fromOrdinal: Int, to: MutableRow, toOrdinal: Int) {
-    to.setBoolean(toOrdinal, from.getBoolean(fromOrdinal))
-  }
-}
-
-private[sql] object BYTE extends NativeColumnType(ByteType, 1) {
-  override def append(v: Byte, buffer: ByteBuffer): Unit = {
-    buffer.put(v)
-  }
-
-  override def append(row: InternalRow, ordinal: Int, buffer: ByteBuffer): Unit = {
-    buffer.put(row.getByte(ordinal))
-  }
-
-  override def extract(buffer: ByteBuffer): Byte = {
-    buffer.get()
-  }
-
-  override def extract(buffer: ByteBuffer, row: MutableRow, ordinal: Int): Unit = {
-    row.setByte(ordinal, buffer.get())
-  }
-
-  override def setField(row: MutableRow, ordinal: Int, value: Byte): Unit = {
-    row.setByte(ordinal, value)
-  }
-
-  override def getField(row: InternalRow, ordinal: Int): Byte = row.getByte(ordinal)
-
-  override def copyField(from: InternalRow, fromOrdinal: Int, to: MutableRow, toOrdinal: Int) {
-    to.setByte(toOrdinal, from.getByte(fromOrdinal))
-  }
-}
-
-private[sql] object SHORT extends NativeColumnType(ShortType, 2) {
-  override def append(v: Short, buffer: ByteBuffer): Unit = {
-    buffer.putShort(v)
-  }
-
-  override def append(row: InternalRow, ordinal: Int, buffer: ByteBuffer): Unit = {
-    buffer.putShort(row.getShort(ordinal))
-  }
-
-  override def extract(buffer: ByteBuffer): Short = {
-    buffer.getShort()
-  }
-
-  override def extract(buffer: ByteBuffer, row: MutableRow, ordinal: Int): Unit = {
-    row.setShort(ordinal, buffer.getShort())
-  }
-
-  override def setField(row: MutableRow, ordinal: Int, value: Short): Unit = {
-    row.setShort(ordinal, value)
-  }
-
-  override def getField(row: InternalRow, ordinal: Int): Short = row.getShort(ordinal)
-
-  override def copyField(from: InternalRow, fromOrdinal: Int, to: MutableRow, toOrdinal: Int) {
-    to.setShort(toOrdinal, from.getShort(fromOrdinal))
-  }
-}
-
-/**
- * A fast path to copy var-length bytes between ByteBuffer and UnsafeRow without creating wrapper
- * objects.
- */
-private[sql] trait DirectCopyColumnType[JvmType] extends ColumnType[JvmType] {
-
-  // copy the bytes from ByteBuffer to UnsafeRow
-  override def extract(buffer: ByteBuffer, row: MutableRow, ordinal: Int): Unit = {
-    if (row.isInstanceOf[MutableUnsafeRow]) {
-      val numBytes = buffer.getInt
-      val cursor = buffer.position()
-      buffer.position(cursor + numBytes)
-      row.asInstanceOf[MutableUnsafeRow].writer.write(ordinal, buffer.array(),
-        buffer.arrayOffset() + cursor, numBytes)
-    } else {
-      setField(row, ordinal, extract(buffer))
-    }
-  }
-
-  // copy the bytes from UnsafeRow to ByteBuffer
-  override def append(row: InternalRow, ordinal: Int, buffer: ByteBuffer): Unit = {
-    if (row.isInstanceOf[UnsafeRow]) {
-      row.asInstanceOf[UnsafeRow].writeFieldTo(ordinal, buffer)
-    } else {
-      super.append(row, ordinal, buffer)
-    }
-  }
-}
-
-private[sql] object STRING
-  extends NativeColumnType(StringType, 8) with DirectCopyColumnType[UTF8String] {
-
-  override def actualSize(row: InternalRow, ordinal: Int): Int = {
-    row.getUTF8String(ordinal).numBytes() + 4
-  }
-
-  override def append(v: UTF8String, buffer: ByteBuffer): Unit = {
-    buffer.putInt(v.numBytes())
-    v.writeTo(buffer)
-  }
-
-  override def extract(buffer: ByteBuffer): UTF8String = {
-    val length = buffer.getInt()
-    val cursor = buffer.position()
-    buffer.position(cursor + length)
-    UTF8String.fromBytes(buffer.array(), buffer.arrayOffset() + cursor, length)
-  }
-
-  override def setField(row: MutableRow, ordinal: Int, value: UTF8String): Unit = {
-    if (row.isInstanceOf[MutableUnsafeRow]) {
-      row.asInstanceOf[MutableUnsafeRow].writer.write(ordinal, value)
-    } else {
-      row.update(ordinal, value.clone())
-    }
-  }
-
-  override def getField(row: InternalRow, ordinal: Int): UTF8String = {
-    row.getUTF8String(ordinal)
-  }
-
-  override def copyField(from: InternalRow, fromOrdinal: Int, to: MutableRow, toOrdinal: Int) {
-    setField(to, toOrdinal, getField(from, fromOrdinal))
-  }
-
-  override def clone(v: UTF8String): UTF8String = v.clone()
-}
-
-private[sql] case class COMPACT_DECIMAL(precision: Int, scale: Int)
-  extends NativeColumnType(DecimalType(precision, scale), 8) {
-
-  override def extract(buffer: ByteBuffer): Decimal = {
-    Decimal(ByteBufferHelper.getLong(buffer), precision, scale)
-  }
-
-  override def extract(buffer: ByteBuffer, row: MutableRow, ordinal: Int): Unit = {
-    if (row.isInstanceOf[MutableUnsafeRow]) {
-      // copy it as Long
-      row.setLong(ordinal, ByteBufferHelper.getLong(buffer))
-    } else {
-      setField(row, ordinal, extract(buffer))
-    }
-  }
-
-  override def append(v: Decimal, buffer: ByteBuffer): Unit = {
-    buffer.putLong(v.toUnscaledLong)
-  }
-
-  override def append(row: InternalRow, ordinal: Int, buffer: ByteBuffer): Unit = {
-    if (row.isInstanceOf[UnsafeRow]) {
-      // copy it as Long
-      buffer.putLong(row.getLong(ordinal))
-    } else {
-      append(getField(row, ordinal), buffer)
-    }
-  }
-
-  override def getField(row: InternalRow, ordinal: Int): Decimal = {
-    row.getDecimal(ordinal, precision, scale)
-  }
-
-  override def setField(row: MutableRow, ordinal: Int, value: Decimal): Unit = {
-    row.setDecimal(ordinal, value, precision)
-  }
-
-  override def copyField(from: InternalRow, fromOrdinal: Int, to: MutableRow, toOrdinal: Int) {
-    setField(to, toOrdinal, getField(from, fromOrdinal))
-  }
-}
-
-private[sql] object COMPACT_DECIMAL {
-  def apply(dt: DecimalType): COMPACT_DECIMAL = {
-    COMPACT_DECIMAL(dt.precision, dt.scale)
-  }
-}
-
-private[sql] sealed abstract class ByteArrayColumnType[JvmType](val defaultSize: Int)
-  extends ColumnType[JvmType] with DirectCopyColumnType[JvmType] {
-
-  def serialize(value: JvmType): Array[Byte]
-  def deserialize(bytes: Array[Byte]): JvmType
-
-  override def append(v: JvmType, buffer: ByteBuffer): Unit = {
-    val bytes = serialize(v)
-    buffer.putInt(bytes.length).put(bytes, 0, bytes.length)
-  }
-
-  override def extract(buffer: ByteBuffer): JvmType = {
-    val length = buffer.getInt()
-    val bytes = new Array[Byte](length)
-    buffer.get(bytes, 0, length)
-    deserialize(bytes)
-  }
-}
-
-private[sql] object BINARY extends ByteArrayColumnType[Array[Byte]](16) {
-
-  def dataType: DataType = BinaryType
-
-  override def setField(row: MutableRow, ordinal: Int, value: Array[Byte]): Unit = {
-    row.update(ordinal, value)
-  }
-
-  override def getField(row: InternalRow, ordinal: Int): Array[Byte] = {
-    row.getBinary(ordinal)
-  }
-
-  override def actualSize(row: InternalRow, ordinal: Int): Int = {
-    row.getBinary(ordinal).length + 4
-  }
-
-  def serialize(value: Array[Byte]): Array[Byte] = value
-  def deserialize(bytes: Array[Byte]): Array[Byte] = bytes
-}
-
-private[sql] case class LARGE_DECIMAL(precision: Int, scale: Int)
-  extends ByteArrayColumnType[Decimal](12) {
-
-  override val dataType: DataType = DecimalType(precision, scale)
-
-  override def getField(row: InternalRow, ordinal: Int): Decimal = {
-    row.getDecimal(ordinal, precision, scale)
-  }
-
-  override def setField(row: MutableRow, ordinal: Int, value: Decimal): Unit = {
-    row.setDecimal(ordinal, value, precision)
-  }
-
-  override def actualSize(row: InternalRow, ordinal: Int): Int = {
-    4 + getField(row, ordinal).toJavaBigDecimal.unscaledValue().bitLength() / 8 + 1
-  }
-
-  override def serialize(value: Decimal): Array[Byte] = {
-    value.toJavaBigDecimal.unscaledValue().toByteArray
-  }
-
-  override def deserialize(bytes: Array[Byte]): Decimal = {
-    val javaDecimal = new BigDecimal(new BigInteger(bytes), scale)
-    Decimal.apply(javaDecimal, precision, scale)
-  }
-}
-
-private[sql] object LARGE_DECIMAL {
-  def apply(dt: DecimalType): LARGE_DECIMAL = {
-    LARGE_DECIMAL(dt.precision, dt.scale)
-  }
-}
-
-private[sql] case class STRUCT(dataType: StructType)
-  extends ColumnType[UnsafeRow] with DirectCopyColumnType[UnsafeRow] {
-
-  private val numOfFields: Int = dataType.fields.size
-
-  override def defaultSize: Int = 20
-
-  override def setField(row: MutableRow, ordinal: Int, value: UnsafeRow): Unit = {
-    row.update(ordinal, value)
-  }
-
-  override def getField(row: InternalRow, ordinal: Int): UnsafeRow = {
-    row.getStruct(ordinal, numOfFields).asInstanceOf[UnsafeRow]
-  }
-
-  override def actualSize(row: InternalRow, ordinal: Int): Int = {
-    4 + getField(row, ordinal).getSizeInBytes
-  }
-
-  override def append(value: UnsafeRow, buffer: ByteBuffer): Unit = {
-    buffer.putInt(value.getSizeInBytes)
-    value.writeTo(buffer)
-  }
-
-  override def extract(buffer: ByteBuffer): UnsafeRow = {
-    val sizeInBytes = ByteBufferHelper.getInt(buffer)
-    assert(buffer.hasArray)
-    val cursor = buffer.position()
-    buffer.position(cursor + sizeInBytes)
-    val unsafeRow = new UnsafeRow
-    unsafeRow.pointTo(
-      buffer.array(),
-      Platform.BYTE_ARRAY_OFFSET + buffer.arrayOffset() + cursor,
-      numOfFields,
-      sizeInBytes)
-    unsafeRow
-  }
-
-  override def clone(v: UnsafeRow): UnsafeRow = v.copy()
-}
-
-private[sql] case class ARRAY(dataType: ArrayType)
-  extends ColumnType[UnsafeArrayData] with DirectCopyColumnType[UnsafeArrayData] {
-
-  override def defaultSize: Int = 16
-
-  override def setField(row: MutableRow, ordinal: Int, value: UnsafeArrayData): Unit = {
-    row.update(ordinal, value)
-  }
-
-  override def getField(row: InternalRow, ordinal: Int): UnsafeArrayData = {
-    row.getArray(ordinal).asInstanceOf[UnsafeArrayData]
-  }
-
-  override def actualSize(row: InternalRow, ordinal: Int): Int = {
-    val unsafeArray = getField(row, ordinal)
-    4 + unsafeArray.getSizeInBytes
-  }
-
-  override def append(value: UnsafeArrayData, buffer: ByteBuffer): Unit = {
-    buffer.putInt(value.getSizeInBytes)
-    value.writeTo(buffer)
-  }
-
-  override def extract(buffer: ByteBuffer): UnsafeArrayData = {
-    val numBytes = buffer.getInt
-    assert(buffer.hasArray)
-    val cursor = buffer.position()
-    buffer.position(cursor + numBytes)
-    val array = new UnsafeArrayData
-    array.pointTo(
-      buffer.array(),
-      Platform.BYTE_ARRAY_OFFSET + buffer.arrayOffset() + cursor,
-      numBytes)
-    array
-  }
-
-  override def clone(v: UnsafeArrayData): UnsafeArrayData = v.copy()
-}
-
-private[sql] case class MAP(dataType: MapType)
-  extends ColumnType[UnsafeMapData] with DirectCopyColumnType[UnsafeMapData] {
-
-  override def defaultSize: Int = 32
-
-  override def setField(row: MutableRow, ordinal: Int, value: UnsafeMapData): Unit = {
-    row.update(ordinal, value)
-  }
-
-  override def getField(row: InternalRow, ordinal: Int): UnsafeMapData = {
-    row.getMap(ordinal).asInstanceOf[UnsafeMapData]
-  }
-
-  override def actualSize(row: InternalRow, ordinal: Int): Int = {
-    val unsafeMap = getField(row, ordinal)
-    4 + unsafeMap.getSizeInBytes
-  }
-
-  override def append(value: UnsafeMapData, buffer: ByteBuffer): Unit = {
-    buffer.putInt(value.getSizeInBytes)
-    value.writeTo(buffer)
-  }
-
-  override def extract(buffer: ByteBuffer): UnsafeMapData = {
-    val numBytes = buffer.getInt
-    val cursor = buffer.position()
-    buffer.position(cursor + numBytes)
-    val map = new UnsafeMapData
-    map.pointTo(
-      buffer.array(),
-      Platform.BYTE_ARRAY_OFFSET + buffer.arrayOffset() + cursor,
-      numBytes)
-    map
-  }
-
-  override def clone(v: UnsafeMapData): UnsafeMapData = v.copy()
-}
-
-private[sql] object ColumnType {
-  def apply(dataType: DataType): ColumnType[_] = {
-    dataType match {
-      case NullType => NULL
-      case BooleanType => BOOLEAN
-      case ByteType => BYTE
-      case ShortType => SHORT
-      case IntegerType | DateType => INT
-      case LongType | TimestampType => LONG
-      case FloatType => FLOAT
-      case DoubleType => DOUBLE
-      case StringType => STRING
-      case BinaryType => BINARY
-      case dt: DecimalType if dt.precision <= Decimal.MAX_LONG_DIGITS => COMPACT_DECIMAL(dt)
-      case dt: DecimalType => LARGE_DECIMAL(dt)
-      case arr: ArrayType => ARRAY(arr)
-      case map: MapType => MAP(map)
-      case struct: StructType => STRUCT(struct)
-      case udt: UserDefinedType[_] => apply(udt.sqlType)
-      case other =>
-        throw new Exception(s"Unsupported type: $other")
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/014c0f7a/sql/core/src/main/scala/org/apache/spark/sql/columnar/GenerateColumnAccessor.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/columnar/GenerateColumnAccessor.scala b/sql/core/src/main/scala/org/apache/spark/sql/columnar/GenerateColumnAccessor.scala
deleted file mode 100644
index ff9393b..0000000
--- a/sql/core/src/main/scala/org/apache/spark/sql/columnar/GenerateColumnAccessor.scala
+++ /dev/null
@@ -1,195 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.columnar
-
-import org.apache.spark.Logging
-import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.expressions._
-import org.apache.spark.sql.catalyst.expressions.codegen.{UnsafeRowWriter, CodeFormatter, CodeGenerator}
-import org.apache.spark.sql.types._
-
-/**
- * An Iterator to walk through the InternalRows from a CachedBatch
- */
-abstract class ColumnarIterator extends Iterator[InternalRow] {
-  def initialize(input: Iterator[CachedBatch], columnTypes: Array[DataType],
-    columnIndexes: Array[Int]): Unit
-}
-
-/**
- * An helper class to update the fields of UnsafeRow, used by ColumnAccessor
- *
- * WARNING: These setter MUST be called in increasing order of ordinals.
- */
-class MutableUnsafeRow(val writer: UnsafeRowWriter) extends GenericMutableRow(null) {
-
-  override def isNullAt(i: Int): Boolean = writer.isNullAt(i)
-  override def setNullAt(i: Int): Unit = writer.setNullAt(i)
-
-  override def setBoolean(i: Int, v: Boolean): Unit = writer.write(i, v)
-  override def setByte(i: Int, v: Byte): Unit = writer.write(i, v)
-  override def setShort(i: Int, v: Short): Unit = writer.write(i, v)
-  override def setInt(i: Int, v: Int): Unit = writer.write(i, v)
-  override def setLong(i: Int, v: Long): Unit = writer.write(i, v)
-  override def setFloat(i: Int, v: Float): Unit = writer.write(i, v)
-  override def setDouble(i: Int, v: Double): Unit = writer.write(i, v)
-
-  // the writer will be used directly to avoid creating wrapper objects
-  override def setDecimal(i: Int, v: Decimal, precision: Int): Unit =
-    throw new UnsupportedOperationException
-  override def update(i: Int, v: Any): Unit = throw new UnsupportedOperationException
-
-  // all other methods inherited from GenericMutableRow are not need
-}
-
-/**
- * Generates bytecode for an [[ColumnarIterator]] for columnar cache.
- */
-object GenerateColumnAccessor extends CodeGenerator[Seq[DataType], ColumnarIterator] with Logging {
-
-  protected def canonicalize(in: Seq[DataType]): Seq[DataType] = in
-  protected def bind(in: Seq[DataType], inputSchema: Seq[Attribute]): Seq[DataType] = in
-
-  protected def create(columnTypes: Seq[DataType]): ColumnarIterator = {
-    val ctx = newCodeGenContext()
-    val numFields = columnTypes.size
-    val (initializeAccessors, extractors) = columnTypes.zipWithIndex.map { case (dt, index) =>
-      val accessorName = ctx.freshName("accessor")
-      val accessorCls = dt match {
-        case NullType => classOf[NullColumnAccessor].getName
-        case BooleanType => classOf[BooleanColumnAccessor].getName
-        case ByteType => classOf[ByteColumnAccessor].getName
-        case ShortType => classOf[ShortColumnAccessor].getName
-        case IntegerType | DateType => classOf[IntColumnAccessor].getName
-        case LongType | TimestampType => classOf[LongColumnAccessor].getName
-        case FloatType => classOf[FloatColumnAccessor].getName
-        case DoubleType => classOf[DoubleColumnAccessor].getName
-        case StringType => classOf[StringColumnAccessor].getName
-        case BinaryType => classOf[BinaryColumnAccessor].getName
-        case dt: DecimalType if dt.precision <= Decimal.MAX_LONG_DIGITS =>
-          classOf[CompactDecimalColumnAccessor].getName
-        case dt: DecimalType => classOf[DecimalColumnAccessor].getName
-        case struct: StructType => classOf[StructColumnAccessor].getName
-        case array: ArrayType => classOf[ArrayColumnAccessor].getName
-        case t: MapType => classOf[MapColumnAccessor].getName
-      }
-      ctx.addMutableState(accessorCls, accessorName, s"$accessorName = null;")
-
-      val createCode = dt match {
-        case t if ctx.isPrimitiveType(dt) =>
-          s"$accessorName = new $accessorCls(ByteBuffer.wrap(buffers[$index]).order(nativeOrder));"
-        case NullType | StringType | BinaryType =>
-          s"$accessorName = new $accessorCls(ByteBuffer.wrap(buffers[$index]).order(nativeOrder));"
-        case other =>
-          s"""$accessorName = new $accessorCls(ByteBuffer.wrap(buffers[$index]).order(nativeOrder),
-             (${dt.getClass.getName}) columnTypes[$index]);"""
-      }
-
-      val extract = s"$accessorName.extractTo(mutableRow, $index);"
-      val patch = dt match {
-        case DecimalType.Fixed(p, s) if p > Decimal.MAX_LONG_DIGITS =>
-          // For large Decimal, it should have 16 bytes for future update even it's null now.
-          s"""
-            if (mutableRow.isNullAt($index)) {
-              rowWriter.write($index, (Decimal) null, $p, $s);
-            }
-           """
-        case other => ""
-      }
-      (createCode, extract + patch)
-    }.unzip
-
-    val code = s"""
-      import java.nio.ByteBuffer;
-      import java.nio.ByteOrder;
-      import scala.collection.Iterator;
-      import org.apache.spark.sql.types.DataType;
-      import org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder;
-      import org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter;
-      import org.apache.spark.sql.columnar.MutableUnsafeRow;
-
-      public SpecificColumnarIterator generate($exprType[] expr) {
-        return new SpecificColumnarIterator();
-      }
-
-      class SpecificColumnarIterator extends ${classOf[ColumnarIterator].getName} {
-
-        private ByteOrder nativeOrder = null;
-        private byte[][] buffers = null;
-        private UnsafeRow unsafeRow = new UnsafeRow();
-        private BufferHolder bufferHolder = new BufferHolder();
-        private UnsafeRowWriter rowWriter = new UnsafeRowWriter();
-        private MutableUnsafeRow mutableRow = null;
-
-        private int currentRow = 0;
-        private int numRowsInBatch = 0;
-
-        private scala.collection.Iterator input = null;
-        private DataType[] columnTypes = null;
-        private int[] columnIndexes = null;
-
-        ${declareMutableStates(ctx)}
-
-        public SpecificColumnarIterator() {
-          this.nativeOrder = ByteOrder.nativeOrder();
-          this.buffers = new byte[${columnTypes.length}][];
-          this.mutableRow = new MutableUnsafeRow(rowWriter);
-
-          ${initMutableStates(ctx)}
-        }
-
-        public void initialize(Iterator input, DataType[] columnTypes, int[] columnIndexes) {
-          this.input = input;
-          this.columnTypes = columnTypes;
-          this.columnIndexes = columnIndexes;
-        }
-
-        public boolean hasNext() {
-          if (currentRow < numRowsInBatch) {
-            return true;
-          }
-          if (!input.hasNext()) {
-            return false;
-          }
-
-          ${classOf[CachedBatch].getName} batch = (${classOf[CachedBatch].getName}) input.next();
-          currentRow = 0;
-          numRowsInBatch = batch.numRows();
-          for (int i = 0; i < columnIndexes.length; i ++) {
-            buffers[i] = batch.buffers()[columnIndexes[i]];
-          }
-          ${initializeAccessors.mkString("\n")}
-
-          return hasNext();
-        }
-
-        public InternalRow next() {
-          currentRow += 1;
-          bufferHolder.reset();
-          rowWriter.initialize(bufferHolder, $numFields);
-          ${extractors.mkString("\n")}
-          unsafeRow.pointTo(bufferHolder.buffer, $numFields, bufferHolder.totalSize());
-          return unsafeRow;
-        }
-      }"""
-
-    logDebug(s"Generated ColumnarIterator: ${CodeFormatter.format(code)}")
-
-    compile(code).generate(ctx.references.toArray).asInstanceOf[ColumnarIterator]
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/014c0f7a/sql/core/src/main/scala/org/apache/spark/sql/columnar/InMemoryColumnarTableScan.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/columnar/InMemoryColumnarTableScan.scala b/sql/core/src/main/scala/org/apache/spark/sql/columnar/InMemoryColumnarTableScan.scala
deleted file mode 100644
index ae77298..0000000
--- a/sql/core/src/main/scala/org/apache/spark/sql/columnar/InMemoryColumnarTableScan.scala
+++ /dev/null
@@ -1,345 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.columnar
-
-import scala.collection.mutable.ArrayBuffer
-
-import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation
-import org.apache.spark.sql.catalyst.dsl.expressions._
-import org.apache.spark.sql.catalyst.expressions._
-import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Statistics}
-import org.apache.spark.sql.catalyst.plans.physical.Partitioning
-import org.apache.spark.sql.execution.{ConvertToUnsafe, LeafNode, SparkPlan}
-import org.apache.spark.sql.types.UserDefinedType
-import org.apache.spark.storage.StorageLevel
-import org.apache.spark.{Accumulable, Accumulator, Accumulators}
-
-private[sql] object InMemoryRelation {
-  def apply(
-      useCompression: Boolean,
-      batchSize: Int,
-      storageLevel: StorageLevel,
-      child: SparkPlan,
-      tableName: Option[String]): InMemoryRelation =
-    new InMemoryRelation(child.output, useCompression, batchSize, storageLevel,
-      if (child.outputsUnsafeRows) child else ConvertToUnsafe(child),
-      tableName)()
-}
-
-/**
- * CachedBatch is a cached batch of rows.
- *
- * @param numRows The total number of rows in this batch
- * @param buffers The buffers for serialized columns
- * @param stats The stat of columns
- */
-private[sql] case class CachedBatch(numRows: Int, buffers: Array[Array[Byte]], stats: InternalRow)
-
-private[sql] case class InMemoryRelation(
-    output: Seq[Attribute],
-    useCompression: Boolean,
-    batchSize: Int,
-    storageLevel: StorageLevel,
-    @transient child: SparkPlan,
-    tableName: Option[String])(
-    @transient private var _cachedColumnBuffers: RDD[CachedBatch] = null,
-    @transient private var _statistics: Statistics = null,
-    private var _batchStats: Accumulable[ArrayBuffer[InternalRow], InternalRow] = null)
-  extends LogicalPlan with MultiInstanceRelation {
-
-  private val batchStats: Accumulable[ArrayBuffer[InternalRow], InternalRow] =
-    if (_batchStats == null) {
-      child.sqlContext.sparkContext.accumulableCollection(ArrayBuffer.empty[InternalRow])
-    } else {
-      _batchStats
-    }
-
-  @transient val partitionStatistics = new PartitionStatistics(output)
-
-  private def computeSizeInBytes = {
-    val sizeOfRow: Expression =
-      BindReferences.bindReference(
-        output.map(a => partitionStatistics.forAttribute(a).sizeInBytes).reduce(Add),
-        partitionStatistics.schema)
-
-    batchStats.value.map(row => sizeOfRow.eval(row).asInstanceOf[Long]).sum
-  }
-
-  // Statistics propagation contracts:
-  // 1. Non-null `_statistics` must reflect the actual statistics of the underlying data
-  // 2. Only propagate statistics when `_statistics` is non-null
-  private def statisticsToBePropagated = if (_statistics == null) {
-    val updatedStats = statistics
-    if (_statistics == null) null else updatedStats
-  } else {
-    _statistics
-  }
-
-  override def statistics: Statistics = {
-    if (_statistics == null) {
-      if (batchStats.value.isEmpty) {
-        // Underlying columnar RDD hasn't been materialized, no useful statistics information
-        // available, return the default statistics.
-        Statistics(sizeInBytes = child.sqlContext.conf.defaultSizeInBytes)
-      } else {
-        // Underlying columnar RDD has been materialized, required information has also been
-        // collected via the `batchStats` accumulator, compute the final statistics,
-        // and update `_statistics`.
-        _statistics = Statistics(sizeInBytes = computeSizeInBytes)
-        _statistics
-      }
-    } else {
-      // Pre-computed statistics
-      _statistics
-    }
-  }
-
-  // If the cached column buffers were not passed in, we calculate them in the constructor.
-  // As in Spark, the actual work of caching is lazy.
-  if (_cachedColumnBuffers == null) {
-    buildBuffers()
-  }
-
-  def recache(): Unit = {
-    _cachedColumnBuffers.unpersist()
-    _cachedColumnBuffers = null
-    buildBuffers()
-  }
-
-  private def buildBuffers(): Unit = {
-    val output = child.output
-    val cached = child.execute().mapPartitionsInternal { rowIterator =>
-      new Iterator[CachedBatch] {
-        def next(): CachedBatch = {
-          val columnBuilders = output.map { attribute =>
-            ColumnBuilder(attribute.dataType, batchSize, attribute.name, useCompression)
-          }.toArray
-
-          var rowCount = 0
-          var totalSize = 0L
-          while (rowIterator.hasNext && rowCount < batchSize
-            && totalSize < ColumnBuilder.MAX_BATCH_SIZE_IN_BYTE) {
-            val row = rowIterator.next()
-
-            // Added for SPARK-6082. This assertion can be useful for scenarios when something
-            // like Hive TRANSFORM is used. The external data generation script used in TRANSFORM
-            // may result malformed rows, causing ArrayIndexOutOfBoundsException, which is somewhat
-            // hard to decipher.
-            assert(
-              row.numFields == columnBuilders.size,
-              s"Row column number mismatch, expected ${output.size} columns, " +
-                s"but got ${row.numFields}." +
-                s"\nRow content: $row")
-
-            var i = 0
-            totalSize = 0
-            while (i < row.numFields) {
-              columnBuilders(i).appendFrom(row, i)
-              totalSize += columnBuilders(i).columnStats.sizeInBytes
-              i += 1
-            }
-            rowCount += 1
-          }
-
-          val stats = InternalRow.fromSeq(columnBuilders.map(_.columnStats.collectedStatistics)
-                        .flatMap(_.values))
-
-          batchStats += stats
-          CachedBatch(rowCount, columnBuilders.map(_.build().array()), stats)
-        }
-
-        def hasNext: Boolean = rowIterator.hasNext
-      }
-    }.persist(storageLevel)
-
-    cached.setName(tableName.map(n => s"In-memory table $n").getOrElse(child.toString))
-    _cachedColumnBuffers = cached
-  }
-
-  def withOutput(newOutput: Seq[Attribute]): InMemoryRelation = {
-    InMemoryRelation(
-      newOutput, useCompression, batchSize, storageLevel, child, tableName)(
-      _cachedColumnBuffers, statisticsToBePropagated, batchStats)
-  }
-
-  override def children: Seq[LogicalPlan] = Seq.empty
-
-  override def newInstance(): this.type = {
-    new InMemoryRelation(
-      output.map(_.newInstance()),
-      useCompression,
-      batchSize,
-      storageLevel,
-      child,
-      tableName)(
-      _cachedColumnBuffers,
-      statisticsToBePropagated,
-      batchStats).asInstanceOf[this.type]
-  }
-
-  def cachedColumnBuffers: RDD[CachedBatch] = _cachedColumnBuffers
-
-  override protected def otherCopyArgs: Seq[AnyRef] =
-    Seq(_cachedColumnBuffers, statisticsToBePropagated, batchStats)
-
-  private[sql] def uncache(blocking: Boolean): Unit = {
-    Accumulators.remove(batchStats.id)
-    cachedColumnBuffers.unpersist(blocking)
-    _cachedColumnBuffers = null
-  }
-}
-
-private[sql] case class InMemoryColumnarTableScan(
-    attributes: Seq[Attribute],
-    predicates: Seq[Expression],
-    @transient relation: InMemoryRelation)
-  extends LeafNode {
-
-  override def output: Seq[Attribute] = attributes
-
-  // The cached version does not change the outputPartitioning of the original SparkPlan.
-  override def outputPartitioning: Partitioning = relation.child.outputPartitioning
-
-  // The cached version does not change the outputOrdering of the original SparkPlan.
-  override def outputOrdering: Seq[SortOrder] = relation.child.outputOrdering
-
-  override def outputsUnsafeRows: Boolean = true
-
-  private def statsFor(a: Attribute) = relation.partitionStatistics.forAttribute(a)
-
-  // Returned filter predicate should return false iff it is impossible for the input expression
-  // to evaluate to `true' based on statistics collected about this partition batch.
-  @transient val buildFilter: PartialFunction[Expression, Expression] = {
-    case And(lhs: Expression, rhs: Expression)
-      if buildFilter.isDefinedAt(lhs) || buildFilter.isDefinedAt(rhs) =>
-      (buildFilter.lift(lhs) ++ buildFilter.lift(rhs)).reduce(_ && _)
-
-    case Or(lhs: Expression, rhs: Expression)
-      if buildFilter.isDefinedAt(lhs) && buildFilter.isDefinedAt(rhs) =>
-      buildFilter(lhs) || buildFilter(rhs)
-
-    case EqualTo(a: AttributeReference, l: Literal) =>
-      statsFor(a).lowerBound <= l && l <= statsFor(a).upperBound
-    case EqualTo(l: Literal, a: AttributeReference) =>
-      statsFor(a).lowerBound <= l && l <= statsFor(a).upperBound
-
-    case LessThan(a: AttributeReference, l: Literal) => statsFor(a).lowerBound < l
-    case LessThan(l: Literal, a: AttributeReference) => l < statsFor(a).upperBound
-
-    case LessThanOrEqual(a: AttributeReference, l: Literal) => statsFor(a).lowerBound <= l
-    case LessThanOrEqual(l: Literal, a: AttributeReference) => l <= statsFor(a).upperBound
-
-    case GreaterThan(a: AttributeReference, l: Literal) => l < statsFor(a).upperBound
-    case GreaterThan(l: Literal, a: AttributeReference) => statsFor(a).lowerBound < l
-
-    case GreaterThanOrEqual(a: AttributeReference, l: Literal) => l <= statsFor(a).upperBound
-    case GreaterThanOrEqual(l: Literal, a: AttributeReference) => statsFor(a).lowerBound <= l
-
-    case IsNull(a: Attribute) => statsFor(a).nullCount > 0
-    case IsNotNull(a: Attribute) => statsFor(a).count - statsFor(a).nullCount > 0
-  }
-
-  val partitionFilters: Seq[Expression] = {
-    predicates.flatMap { p =>
-      val filter = buildFilter.lift(p)
-      val boundFilter =
-        filter.map(
-          BindReferences.bindReference(
-            _,
-            relation.partitionStatistics.schema,
-            allowFailures = true))
-
-      boundFilter.foreach(_ =>
-        filter.foreach(f => logInfo(s"Predicate $p generates partition filter: $f")))
-
-      // If the filter can't be resolved then we are missing required statistics.
-      boundFilter.filter(_.resolved)
-    }
-  }
-
-  lazy val enableAccumulators: Boolean =
-    sqlContext.getConf("spark.sql.inMemoryTableScanStatistics.enable", "false").toBoolean
-
-  // Accumulators used for testing purposes
-  lazy val readPartitions: Accumulator[Int] = sparkContext.accumulator(0)
-  lazy val readBatches: Accumulator[Int] = sparkContext.accumulator(0)
-
-  private val inMemoryPartitionPruningEnabled = sqlContext.conf.inMemoryPartitionPruning
-
-  protected override def doExecute(): RDD[InternalRow] = {
-    if (enableAccumulators) {
-      readPartitions.setValue(0)
-      readBatches.setValue(0)
-    }
-
-    // Using these variables here to avoid serialization of entire objects (if referenced directly)
-    // within the map Partitions closure.
-    val schema = relation.partitionStatistics.schema
-    val schemaIndex = schema.zipWithIndex
-    val relOutput = relation.output
-    val buffers = relation.cachedColumnBuffers
-
-    buffers.mapPartitionsInternal { cachedBatchIterator =>
-      val partitionFilter = newPredicate(
-        partitionFilters.reduceOption(And).getOrElse(Literal(true)),
-        schema)
-
-      // Find the ordinals and data types of the requested columns.
-      val (requestedColumnIndices, requestedColumnDataTypes) =
-        attributes.map { a =>
-          relOutput.indexWhere(_.exprId == a.exprId) -> a.dataType
-        }.unzip
-
-      // Do partition batch pruning if enabled
-      val cachedBatchesToScan =
-        if (inMemoryPartitionPruningEnabled) {
-          cachedBatchIterator.filter { cachedBatch =>
-            if (!partitionFilter(cachedBatch.stats)) {
-              def statsString: String = schemaIndex.map {
-                case (a, i) =>
-                  val value = cachedBatch.stats.get(i, a.dataType)
-                  s"${a.name}: $value"
-              }.mkString(", ")
-              logInfo(s"Skipping partition based on stats $statsString")
-              false
-            } else {
-              if (enableAccumulators) {
-                readBatches += 1
-              }
-              true
-            }
-          }
-        } else {
-          cachedBatchIterator
-        }
-
-      val columnTypes = requestedColumnDataTypes.map {
-        case udt: UserDefinedType[_] => udt.sqlType
-        case other => other
-      }.toArray
-      val columnarIterator = GenerateColumnAccessor.generate(columnTypes)
-      columnarIterator.initialize(cachedBatchesToScan, columnTypes, requestedColumnIndices.toArray)
-      if (enableAccumulators && columnarIterator.hasNext) {
-        readPartitions += 1
-      }
-      columnarIterator
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/014c0f7a/sql/core/src/main/scala/org/apache/spark/sql/columnar/NullableColumnAccessor.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/columnar/NullableColumnAccessor.scala b/sql/core/src/main/scala/org/apache/spark/sql/columnar/NullableColumnAccessor.scala
deleted file mode 100644
index 7eaecfe..0000000
--- a/sql/core/src/main/scala/org/apache/spark/sql/columnar/NullableColumnAccessor.scala
+++ /dev/null
@@ -1,59 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.columnar
-
-import java.nio.{ByteOrder, ByteBuffer}
-
-import org.apache.spark.sql.catalyst.expressions.MutableRow
-
-private[sql] trait NullableColumnAccessor extends ColumnAccessor {
-  private var nullsBuffer: ByteBuffer = _
-  private var nullCount: Int = _
-  private var seenNulls: Int = 0
-
-  private var nextNullIndex: Int = _
-  private var pos: Int = 0
-
-  abstract override protected def initialize(): Unit = {
-    nullsBuffer = underlyingBuffer.duplicate().order(ByteOrder.nativeOrder())
-    nullCount = ByteBufferHelper.getInt(nullsBuffer)
-    nextNullIndex = if (nullCount > 0) ByteBufferHelper.getInt(nullsBuffer) else -1
-    pos = 0
-
-    underlyingBuffer.position(underlyingBuffer.position + 4 + nullCount * 4)
-    super.initialize()
-  }
-
-  abstract override def extractTo(row: MutableRow, ordinal: Int): Unit = {
-    if (pos == nextNullIndex) {
-      seenNulls += 1
-
-      if (seenNulls < nullCount) {
-        nextNullIndex = ByteBufferHelper.getInt(nullsBuffer)
-      }
-
-      row.setNullAt(ordinal)
-    } else {
-      super.extractTo(row, ordinal)
-    }
-
-    pos += 1
-  }
-
-  abstract override def hasNext: Boolean = seenNulls < nullCount || super.hasNext
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/014c0f7a/sql/core/src/main/scala/org/apache/spark/sql/columnar/NullableColumnBuilder.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/columnar/NullableColumnBuilder.scala b/sql/core/src/main/scala/org/apache/spark/sql/columnar/NullableColumnBuilder.scala
deleted file mode 100644
index 76cfddf..0000000
--- a/sql/core/src/main/scala/org/apache/spark/sql/columnar/NullableColumnBuilder.scala
+++ /dev/null
@@ -1,88 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.columnar
-
-import java.nio.{ByteBuffer, ByteOrder}
-
-import org.apache.spark.sql.catalyst.InternalRow
-
-/**
- * A stackable trait used for building byte buffer for a column containing null values.  Memory
- * layout of the final byte buffer is:
- * {{{
- *    .------------------- Null count N (4 bytes)
- *    |   .--------------- Null positions (4 x N bytes, empty if null count is zero)
- *    |   |     .--------- Non-null elements
- *    V   V     V
- *   +---+-----+---------+
- *   |   | ... | ... ... |
- *   +---+-----+---------+
- * }}}
- */
-private[sql] trait NullableColumnBuilder extends ColumnBuilder {
-  protected var nulls: ByteBuffer = _
-  protected var nullCount: Int = _
-  private var pos: Int = _
-
-  abstract override def initialize(
-      initialSize: Int,
-      columnName: String,
-      useCompression: Boolean): Unit = {
-
-    nulls = ByteBuffer.allocate(1024)
-    nulls.order(ByteOrder.nativeOrder())
-    pos = 0
-    nullCount = 0
-    super.initialize(initialSize, columnName, useCompression)
-  }
-
-  abstract override def appendFrom(row: InternalRow, ordinal: Int): Unit = {
-    columnStats.gatherStats(row, ordinal)
-    if (row.isNullAt(ordinal)) {
-      nulls = ColumnBuilder.ensureFreeSpace(nulls, 4)
-      nulls.putInt(pos)
-      nullCount += 1
-    } else {
-      super.appendFrom(row, ordinal)
-    }
-    pos += 1
-  }
-
-  abstract override def build(): ByteBuffer = {
-    val nonNulls = super.build()
-    val nullDataLen = nulls.position()
-
-    nulls.limit(nullDataLen)
-    nulls.rewind()
-
-    val buffer = ByteBuffer
-      .allocate(4 + nullDataLen + nonNulls.remaining())
-      .order(ByteOrder.nativeOrder())
-      .putInt(nullCount)
-      .put(nulls)
-      .put(nonNulls)
-
-    buffer.rewind()
-    buffer
-  }
-
-  protected def buildNonNulls(): ByteBuffer = {
-    nulls.limit(nulls.position()).rewind()
-    super.build()
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/014c0f7a/sql/core/src/main/scala/org/apache/spark/sql/columnar/compression/CompressibleColumnAccessor.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/columnar/compression/CompressibleColumnAccessor.scala b/sql/core/src/main/scala/org/apache/spark/sql/columnar/compression/CompressibleColumnAccessor.scala
deleted file mode 100644
index cb205de..0000000
--- a/sql/core/src/main/scala/org/apache/spark/sql/columnar/compression/CompressibleColumnAccessor.scala
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.columnar.compression
-
-import org.apache.spark.sql.catalyst.expressions.MutableRow
-import org.apache.spark.sql.columnar.{ColumnAccessor, NativeColumnAccessor}
-import org.apache.spark.sql.types.AtomicType
-
-private[sql] trait CompressibleColumnAccessor[T <: AtomicType] extends ColumnAccessor {
-  this: NativeColumnAccessor[T] =>
-
-  private var decoder: Decoder[T] = _
-
-  abstract override protected def initialize(): Unit = {
-    super.initialize()
-    decoder = CompressionScheme(underlyingBuffer.getInt()).decoder(buffer, columnType)
-  }
-
-  abstract override def hasNext: Boolean = super.hasNext || decoder.hasNext
-
-  override def extractSingle(row: MutableRow, ordinal: Int): Unit = {
-    decoder.next(row, ordinal)
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/014c0f7a/sql/core/src/main/scala/org/apache/spark/sql/columnar/compression/CompressibleColumnBuilder.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/columnar/compression/CompressibleColumnBuilder.scala b/sql/core/src/main/scala/org/apache/spark/sql/columnar/compression/CompressibleColumnBuilder.scala
deleted file mode 100644
index 161021f..0000000
--- a/sql/core/src/main/scala/org/apache/spark/sql/columnar/compression/CompressibleColumnBuilder.scala
+++ /dev/null
@@ -1,109 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.columnar.compression
-
-import java.nio.{ByteBuffer, ByteOrder}
-
-import org.apache.spark.Logging
-import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.columnar.{ColumnBuilder, NativeColumnBuilder}
-import org.apache.spark.sql.types.AtomicType
-
-/**
- * A stackable trait that builds optionally compressed byte buffer for a column.  Memory layout of
- * the final byte buffer is:
- * {{{
- *    .----------------------- Null count N (4 bytes)
- *    |   .------------------- Null positions (4 x N bytes, empty if null count is zero)
- *    |   |     .------------- Compression scheme ID (4 bytes)
- *    |   |     |   .--------- Compressed non-null elements
- *    V   V     V   V
- *   +---+-----+---+---------+
- *   |   | ... |   | ... ... |
- *   +---+-----+---+---------+
- *    \-------/ \-----------/
- *     header         body
- * }}}
- */
-private[sql] trait CompressibleColumnBuilder[T <: AtomicType]
-  extends ColumnBuilder with Logging {
-
-  this: NativeColumnBuilder[T] with WithCompressionSchemes =>
-
-  var compressionEncoders: Seq[Encoder[T]] = _
-
-  abstract override def initialize(
-      initialSize: Int,
-      columnName: String,
-      useCompression: Boolean): Unit = {
-
-    compressionEncoders =
-      if (useCompression) {
-        schemes.filter(_.supports(columnType)).map(_.encoder[T](columnType))
-      } else {
-        Seq(PassThrough.encoder(columnType))
-      }
-    super.initialize(initialSize, columnName, useCompression)
-  }
-
-  protected def isWorthCompressing(encoder: Encoder[T]) = {
-    encoder.compressionRatio < 0.8
-  }
-
-  private def gatherCompressibilityStats(row: InternalRow, ordinal: Int): Unit = {
-    var i = 0
-    while (i < compressionEncoders.length) {
-      compressionEncoders(i).gatherCompressibilityStats(row, ordinal)
-      i += 1
-    }
-  }
-
-  abstract override def appendFrom(row: InternalRow, ordinal: Int): Unit = {
-    super.appendFrom(row, ordinal)
-    if (!row.isNullAt(ordinal)) {
-      gatherCompressibilityStats(row, ordinal)
-    }
-  }
-
-  override def build(): ByteBuffer = {
-    val nonNullBuffer = buildNonNulls()
-    val encoder: Encoder[T] = {
-      val candidate = compressionEncoders.minBy(_.compressionRatio)
-      if (isWorthCompressing(candidate)) candidate else PassThrough.encoder(columnType)
-    }
-
-    // Header = null count + null positions
-    val headerSize = 4 + nulls.limit()
-    val compressedSize = if (encoder.compressedSize == 0) {
-      nonNullBuffer.remaining()
-    } else {
-      encoder.compressedSize
-    }
-
-    val compressedBuffer = ByteBuffer
-      // Reserves 4 bytes for compression scheme ID
-      .allocate(headerSize + 4 + compressedSize)
-      .order(ByteOrder.nativeOrder)
-      // Write the header
-      .putInt(nullCount)
-      .put(nulls)
-
-    logDebug(s"Compressor for [$columnName]: $encoder, ratio: ${encoder.compressionRatio}")
-    encoder.compress(nonNullBuffer, compressedBuffer)
-  }
-}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message