spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From r...@apache.org
Subject [3/5] spark git commit: [SPARK-11858][SQL] Move sql.columnar into sql.execution.
Date Thu, 19 Nov 2015 22:48:24 GMT
http://git-wip-us.apache.org/repos/asf/spark/blob/014c0f7a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarTableScan.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarTableScan.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarTableScan.scala
new file mode 100644
index 0000000..ce701fb
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarTableScan.scala
@@ -0,0 +1,346 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.columnar
+
+import scala.collection.mutable.ArrayBuffer
+
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation
+import org.apache.spark.sql.catalyst.dsl.expressions._
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Statistics}
+import org.apache.spark.sql.catalyst.plans.physical.Partitioning
+import org.apache.spark.sql.execution.{ConvertToUnsafe, LeafNode, SparkPlan}
+import org.apache.spark.sql.types.UserDefinedType
+import org.apache.spark.storage.StorageLevel
+import org.apache.spark.{Accumulable, Accumulator, Accumulators}
+
+private[sql] object InMemoryRelation {
+  def apply(
+      useCompression: Boolean,
+      batchSize: Int,
+      storageLevel: StorageLevel,
+      child: SparkPlan,
+      tableName: Option[String]): InMemoryRelation =
+    new InMemoryRelation(child.output, useCompression, batchSize, storageLevel,
+      if (child.outputsUnsafeRows) child else ConvertToUnsafe(child),
+      tableName)()
+}
+
+/**
+ * CachedBatch is a cached batch of rows.
+ *
+ * @param numRows The total number of rows in this batch
+ * @param buffers The buffers for serialized columns
+ * @param stats The stat of columns
+ */
+private[columnar]
+case class CachedBatch(numRows: Int, buffers: Array[Array[Byte]], stats: InternalRow)
+
+private[sql] case class InMemoryRelation(
+    output: Seq[Attribute],
+    useCompression: Boolean,
+    batchSize: Int,
+    storageLevel: StorageLevel,
+    @transient child: SparkPlan,
+    tableName: Option[String])(
+    @transient private var _cachedColumnBuffers: RDD[CachedBatch] = null,
+    @transient private var _statistics: Statistics = null,
+    private var _batchStats: Accumulable[ArrayBuffer[InternalRow], InternalRow] = null)
+  extends LogicalPlan with MultiInstanceRelation {
+
+  private val batchStats: Accumulable[ArrayBuffer[InternalRow], InternalRow] =
+    if (_batchStats == null) {
+      child.sqlContext.sparkContext.accumulableCollection(ArrayBuffer.empty[InternalRow])
+    } else {
+      _batchStats
+    }
+
+  @transient val partitionStatistics = new PartitionStatistics(output)
+
+  private def computeSizeInBytes = {
+    val sizeOfRow: Expression =
+      BindReferences.bindReference(
+        output.map(a => partitionStatistics.forAttribute(a).sizeInBytes).reduce(Add),
+        partitionStatistics.schema)
+
+    batchStats.value.map(row => sizeOfRow.eval(row).asInstanceOf[Long]).sum
+  }
+
+  // Statistics propagation contracts:
+  // 1. Non-null `_statistics` must reflect the actual statistics of the underlying data
+  // 2. Only propagate statistics when `_statistics` is non-null
+  private def statisticsToBePropagated = if (_statistics == null) {
+    val updatedStats = statistics
+    if (_statistics == null) null else updatedStats
+  } else {
+    _statistics
+  }
+
+  override def statistics: Statistics = {
+    if (_statistics == null) {
+      if (batchStats.value.isEmpty) {
+        // Underlying columnar RDD hasn't been materialized, no useful statistics information
+        // available, return the default statistics.
+        Statistics(sizeInBytes = child.sqlContext.conf.defaultSizeInBytes)
+      } else {
+        // Underlying columnar RDD has been materialized, required information has also been
+        // collected via the `batchStats` accumulator, compute the final statistics,
+        // and update `_statistics`.
+        _statistics = Statistics(sizeInBytes = computeSizeInBytes)
+        _statistics
+      }
+    } else {
+      // Pre-computed statistics
+      _statistics
+    }
+  }
+
+  // If the cached column buffers were not passed in, we calculate them in the constructor.
+  // As in Spark, the actual work of caching is lazy.
+  if (_cachedColumnBuffers == null) {
+    buildBuffers()
+  }
+
+  def recache(): Unit = {
+    _cachedColumnBuffers.unpersist()
+    _cachedColumnBuffers = null
+    buildBuffers()
+  }
+
+  private def buildBuffers(): Unit = {
+    val output = child.output
+    val cached = child.execute().mapPartitionsInternal { rowIterator =>
+      new Iterator[CachedBatch] {
+        def next(): CachedBatch = {
+          val columnBuilders = output.map { attribute =>
+            ColumnBuilder(attribute.dataType, batchSize, attribute.name, useCompression)
+          }.toArray
+
+          var rowCount = 0
+          var totalSize = 0L
+          while (rowIterator.hasNext && rowCount < batchSize
+            && totalSize < ColumnBuilder.MAX_BATCH_SIZE_IN_BYTE) {
+            val row = rowIterator.next()
+
+            // Added for SPARK-6082. This assertion can be useful for scenarios when something
+            // like Hive TRANSFORM is used. The external data generation script used in TRANSFORM
+            // may result malformed rows, causing ArrayIndexOutOfBoundsException, which is somewhat
+            // hard to decipher.
+            assert(
+              row.numFields == columnBuilders.size,
+              s"Row column number mismatch, expected ${output.size} columns, " +
+                s"but got ${row.numFields}." +
+                s"\nRow content: $row")
+
+            var i = 0
+            totalSize = 0
+            while (i < row.numFields) {
+              columnBuilders(i).appendFrom(row, i)
+              totalSize += columnBuilders(i).columnStats.sizeInBytes
+              i += 1
+            }
+            rowCount += 1
+          }
+
+          val stats = InternalRow.fromSeq(columnBuilders.map(_.columnStats.collectedStatistics)
+                        .flatMap(_.values))
+
+          batchStats += stats
+          CachedBatch(rowCount, columnBuilders.map(_.build().array()), stats)
+        }
+
+        def hasNext: Boolean = rowIterator.hasNext
+      }
+    }.persist(storageLevel)
+
+    cached.setName(tableName.map(n => s"In-memory table $n").getOrElse(child.toString))
+    _cachedColumnBuffers = cached
+  }
+
+  def withOutput(newOutput: Seq[Attribute]): InMemoryRelation = {
+    InMemoryRelation(
+      newOutput, useCompression, batchSize, storageLevel, child, tableName)(
+      _cachedColumnBuffers, statisticsToBePropagated, batchStats)
+  }
+
+  override def children: Seq[LogicalPlan] = Seq.empty
+
+  override def newInstance(): this.type = {
+    new InMemoryRelation(
+      output.map(_.newInstance()),
+      useCompression,
+      batchSize,
+      storageLevel,
+      child,
+      tableName)(
+      _cachedColumnBuffers,
+      statisticsToBePropagated,
+      batchStats).asInstanceOf[this.type]
+  }
+
+  def cachedColumnBuffers: RDD[CachedBatch] = _cachedColumnBuffers
+
+  override protected def otherCopyArgs: Seq[AnyRef] =
+    Seq(_cachedColumnBuffers, statisticsToBePropagated, batchStats)
+
+  private[sql] def uncache(blocking: Boolean): Unit = {
+    Accumulators.remove(batchStats.id)
+    cachedColumnBuffers.unpersist(blocking)
+    _cachedColumnBuffers = null
+  }
+}
+
+private[sql] case class InMemoryColumnarTableScan(
+    attributes: Seq[Attribute],
+    predicates: Seq[Expression],
+    @transient relation: InMemoryRelation)
+  extends LeafNode {
+
+  override def output: Seq[Attribute] = attributes
+
+  // The cached version does not change the outputPartitioning of the original SparkPlan.
+  override def outputPartitioning: Partitioning = relation.child.outputPartitioning
+
+  // The cached version does not change the outputOrdering of the original SparkPlan.
+  override def outputOrdering: Seq[SortOrder] = relation.child.outputOrdering
+
+  override def outputsUnsafeRows: Boolean = true
+
+  private def statsFor(a: Attribute) = relation.partitionStatistics.forAttribute(a)
+
+  // Returned filter predicate should return false iff it is impossible for the input expression
+  // to evaluate to `true' based on statistics collected about this partition batch.
+  @transient val buildFilter: PartialFunction[Expression, Expression] = {
+    case And(lhs: Expression, rhs: Expression)
+      if buildFilter.isDefinedAt(lhs) || buildFilter.isDefinedAt(rhs) =>
+      (buildFilter.lift(lhs) ++ buildFilter.lift(rhs)).reduce(_ && _)
+
+    case Or(lhs: Expression, rhs: Expression)
+      if buildFilter.isDefinedAt(lhs) && buildFilter.isDefinedAt(rhs) =>
+      buildFilter(lhs) || buildFilter(rhs)
+
+    case EqualTo(a: AttributeReference, l: Literal) =>
+      statsFor(a).lowerBound <= l && l <= statsFor(a).upperBound
+    case EqualTo(l: Literal, a: AttributeReference) =>
+      statsFor(a).lowerBound <= l && l <= statsFor(a).upperBound
+
+    case LessThan(a: AttributeReference, l: Literal) => statsFor(a).lowerBound < l
+    case LessThan(l: Literal, a: AttributeReference) => l < statsFor(a).upperBound
+
+    case LessThanOrEqual(a: AttributeReference, l: Literal) => statsFor(a).lowerBound <= l
+    case LessThanOrEqual(l: Literal, a: AttributeReference) => l <= statsFor(a).upperBound
+
+    case GreaterThan(a: AttributeReference, l: Literal) => l < statsFor(a).upperBound
+    case GreaterThan(l: Literal, a: AttributeReference) => statsFor(a).lowerBound < l
+
+    case GreaterThanOrEqual(a: AttributeReference, l: Literal) => l <= statsFor(a).upperBound
+    case GreaterThanOrEqual(l: Literal, a: AttributeReference) => statsFor(a).lowerBound <= l
+
+    case IsNull(a: Attribute) => statsFor(a).nullCount > 0
+    case IsNotNull(a: Attribute) => statsFor(a).count - statsFor(a).nullCount > 0
+  }
+
+  val partitionFilters: Seq[Expression] = {
+    predicates.flatMap { p =>
+      val filter = buildFilter.lift(p)
+      val boundFilter =
+        filter.map(
+          BindReferences.bindReference(
+            _,
+            relation.partitionStatistics.schema,
+            allowFailures = true))
+
+      boundFilter.foreach(_ =>
+        filter.foreach(f => logInfo(s"Predicate $p generates partition filter: $f")))
+
+      // If the filter can't be resolved then we are missing required statistics.
+      boundFilter.filter(_.resolved)
+    }
+  }
+
+  lazy val enableAccumulators: Boolean =
+    sqlContext.getConf("spark.sql.inMemoryTableScanStatistics.enable", "false").toBoolean
+
+  // Accumulators used for testing purposes
+  lazy val readPartitions: Accumulator[Int] = sparkContext.accumulator(0)
+  lazy val readBatches: Accumulator[Int] = sparkContext.accumulator(0)
+
+  private val inMemoryPartitionPruningEnabled = sqlContext.conf.inMemoryPartitionPruning
+
+  protected override def doExecute(): RDD[InternalRow] = {
+    if (enableAccumulators) {
+      readPartitions.setValue(0)
+      readBatches.setValue(0)
+    }
+
+    // Using these variables here to avoid serialization of entire objects (if referenced directly)
+    // within the map Partitions closure.
+    val schema = relation.partitionStatistics.schema
+    val schemaIndex = schema.zipWithIndex
+    val relOutput = relation.output
+    val buffers = relation.cachedColumnBuffers
+
+    buffers.mapPartitionsInternal { cachedBatchIterator =>
+      val partitionFilter = newPredicate(
+        partitionFilters.reduceOption(And).getOrElse(Literal(true)),
+        schema)
+
+      // Find the ordinals and data types of the requested columns.
+      val (requestedColumnIndices, requestedColumnDataTypes) =
+        attributes.map { a =>
+          relOutput.indexWhere(_.exprId == a.exprId) -> a.dataType
+        }.unzip
+
+      // Do partition batch pruning if enabled
+      val cachedBatchesToScan =
+        if (inMemoryPartitionPruningEnabled) {
+          cachedBatchIterator.filter { cachedBatch =>
+            if (!partitionFilter(cachedBatch.stats)) {
+              def statsString: String = schemaIndex.map {
+                case (a, i) =>
+                  val value = cachedBatch.stats.get(i, a.dataType)
+                  s"${a.name}: $value"
+              }.mkString(", ")
+              logInfo(s"Skipping partition based on stats $statsString")
+              false
+            } else {
+              if (enableAccumulators) {
+                readBatches += 1
+              }
+              true
+            }
+          }
+        } else {
+          cachedBatchIterator
+        }
+
+      val columnTypes = requestedColumnDataTypes.map {
+        case udt: UserDefinedType[_] => udt.sqlType
+        case other => other
+      }.toArray
+      val columnarIterator = GenerateColumnAccessor.generate(columnTypes)
+      columnarIterator.initialize(cachedBatchesToScan, columnTypes, requestedColumnIndices.toArray)
+      if (enableAccumulators && columnarIterator.hasNext) {
+        readPartitions += 1
+      }
+      columnarIterator
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/014c0f7a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/NullableColumnAccessor.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/NullableColumnAccessor.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/NullableColumnAccessor.scala
new file mode 100644
index 0000000..8d99546
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/NullableColumnAccessor.scala
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.columnar
+
+import java.nio.{ByteOrder, ByteBuffer}
+
+import org.apache.spark.sql.catalyst.expressions.MutableRow
+
+private[columnar] trait NullableColumnAccessor extends ColumnAccessor {
+  private var nullsBuffer: ByteBuffer = _
+  private var nullCount: Int = _
+  private var seenNulls: Int = 0
+
+  private var nextNullIndex: Int = _
+  private var pos: Int = 0
+
+  abstract override protected def initialize(): Unit = {
+    nullsBuffer = underlyingBuffer.duplicate().order(ByteOrder.nativeOrder())
+    nullCount = ByteBufferHelper.getInt(nullsBuffer)
+    nextNullIndex = if (nullCount > 0) ByteBufferHelper.getInt(nullsBuffer) else -1
+    pos = 0
+
+    underlyingBuffer.position(underlyingBuffer.position + 4 + nullCount * 4)
+    super.initialize()
+  }
+
+  abstract override def extractTo(row: MutableRow, ordinal: Int): Unit = {
+    if (pos == nextNullIndex) {
+      seenNulls += 1
+
+      if (seenNulls < nullCount) {
+        nextNullIndex = ByteBufferHelper.getInt(nullsBuffer)
+      }
+
+      row.setNullAt(ordinal)
+    } else {
+      super.extractTo(row, ordinal)
+    }
+
+    pos += 1
+  }
+
+  abstract override def hasNext: Boolean = seenNulls < nullCount || super.hasNext
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/014c0f7a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/NullableColumnBuilder.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/NullableColumnBuilder.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/NullableColumnBuilder.scala
new file mode 100644
index 0000000..3a1931b
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/NullableColumnBuilder.scala
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.columnar
+
+import java.nio.{ByteBuffer, ByteOrder}
+
+import org.apache.spark.sql.catalyst.InternalRow
+
+/**
+ * A stackable trait used for building byte buffer for a column containing null values.  Memory
+ * layout of the final byte buffer is:
+ * {{{
+ *    .------------------- Null count N (4 bytes)
+ *    |   .--------------- Null positions (4 x N bytes, empty if null count is zero)
+ *    |   |     .--------- Non-null elements
+ *    V   V     V
+ *   +---+-----+---------+
+ *   |   | ... | ... ... |
+ *   +---+-----+---------+
+ * }}}
+ */
+private[columnar] trait NullableColumnBuilder extends ColumnBuilder {
+  protected var nulls: ByteBuffer = _
+  protected var nullCount: Int = _
+  private var pos: Int = _
+
+  abstract override def initialize(
+      initialSize: Int,
+      columnName: String,
+      useCompression: Boolean): Unit = {
+
+    nulls = ByteBuffer.allocate(1024)
+    nulls.order(ByteOrder.nativeOrder())
+    pos = 0
+    nullCount = 0
+    super.initialize(initialSize, columnName, useCompression)
+  }
+
+  abstract override def appendFrom(row: InternalRow, ordinal: Int): Unit = {
+    columnStats.gatherStats(row, ordinal)
+    if (row.isNullAt(ordinal)) {
+      nulls = ColumnBuilder.ensureFreeSpace(nulls, 4)
+      nulls.putInt(pos)
+      nullCount += 1
+    } else {
+      super.appendFrom(row, ordinal)
+    }
+    pos += 1
+  }
+
+  abstract override def build(): ByteBuffer = {
+    val nonNulls = super.build()
+    val nullDataLen = nulls.position()
+
+    nulls.limit(nullDataLen)
+    nulls.rewind()
+
+    val buffer = ByteBuffer
+      .allocate(4 + nullDataLen + nonNulls.remaining())
+      .order(ByteOrder.nativeOrder())
+      .putInt(nullCount)
+      .put(nulls)
+      .put(nonNulls)
+
+    buffer.rewind()
+    buffer
+  }
+
+  protected def buildNonNulls(): ByteBuffer = {
+    nulls.limit(nulls.position()).rewind()
+    super.build()
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/014c0f7a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/compression/CompressibleColumnAccessor.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/compression/CompressibleColumnAccessor.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/compression/CompressibleColumnAccessor.scala
new file mode 100644
index 0000000..6579b50
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/compression/CompressibleColumnAccessor.scala
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.columnar.compression
+
+import org.apache.spark.sql.catalyst.expressions.MutableRow
+import org.apache.spark.sql.execution.columnar.{ColumnAccessor, NativeColumnAccessor}
+import org.apache.spark.sql.types.AtomicType
+
+private[columnar] trait CompressibleColumnAccessor[T <: AtomicType] extends ColumnAccessor {
+  this: NativeColumnAccessor[T] =>
+
+  private var decoder: Decoder[T] = _
+
+  abstract override protected def initialize(): Unit = {
+    super.initialize()
+    decoder = CompressionScheme(underlyingBuffer.getInt()).decoder(buffer, columnType)
+  }
+
+  abstract override def hasNext: Boolean = super.hasNext || decoder.hasNext
+
+  override def extractSingle(row: MutableRow, ordinal: Int): Unit = {
+    decoder.next(row, ordinal)
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/014c0f7a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/compression/CompressibleColumnBuilder.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/compression/CompressibleColumnBuilder.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/compression/CompressibleColumnBuilder.scala
new file mode 100644
index 0000000..b0e216f
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/compression/CompressibleColumnBuilder.scala
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.columnar.compression
+
+import java.nio.{ByteBuffer, ByteOrder}
+
+import org.apache.spark.Logging
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.execution.columnar.{ColumnBuilder, NativeColumnBuilder}
+import org.apache.spark.sql.types.AtomicType
+
+/**
+ * A stackable trait that builds optionally compressed byte buffer for a column.  Memory layout of
+ * the final byte buffer is:
+ * {{{
+ *    .----------------------- Null count N (4 bytes)
+ *    |   .------------------- Null positions (4 x N bytes, empty if null count is zero)
+ *    |   |     .------------- Compression scheme ID (4 bytes)
+ *    |   |     |   .--------- Compressed non-null elements
+ *    V   V     V   V
+ *   +---+-----+---+---------+
+ *   |   | ... |   | ... ... |
+ *   +---+-----+---+---------+
+ *    \-------/ \-----------/
+ *     header         body
+ * }}}
+ */
+private[columnar] trait CompressibleColumnBuilder[T <: AtomicType]
+  extends ColumnBuilder with Logging {
+
+  this: NativeColumnBuilder[T] with WithCompressionSchemes =>
+
+  var compressionEncoders: Seq[Encoder[T]] = _
+
+  abstract override def initialize(
+      initialSize: Int,
+      columnName: String,
+      useCompression: Boolean): Unit = {
+
+    compressionEncoders =
+      if (useCompression) {
+        schemes.filter(_.supports(columnType)).map(_.encoder[T](columnType))
+      } else {
+        Seq(PassThrough.encoder(columnType))
+      }
+    super.initialize(initialSize, columnName, useCompression)
+  }
+
+  protected def isWorthCompressing(encoder: Encoder[T]) = {
+    encoder.compressionRatio < 0.8
+  }
+
+  private def gatherCompressibilityStats(row: InternalRow, ordinal: Int): Unit = {
+    var i = 0
+    while (i < compressionEncoders.length) {
+      compressionEncoders(i).gatherCompressibilityStats(row, ordinal)
+      i += 1
+    }
+  }
+
+  abstract override def appendFrom(row: InternalRow, ordinal: Int): Unit = {
+    super.appendFrom(row, ordinal)
+    if (!row.isNullAt(ordinal)) {
+      gatherCompressibilityStats(row, ordinal)
+    }
+  }
+
+  override def build(): ByteBuffer = {
+    val nonNullBuffer = buildNonNulls()
+    val encoder: Encoder[T] = {
+      val candidate = compressionEncoders.minBy(_.compressionRatio)
+      if (isWorthCompressing(candidate)) candidate else PassThrough.encoder(columnType)
+    }
+
+    // Header = null count + null positions
+    val headerSize = 4 + nulls.limit()
+    val compressedSize = if (encoder.compressedSize == 0) {
+      nonNullBuffer.remaining()
+    } else {
+      encoder.compressedSize
+    }
+
+    val compressedBuffer = ByteBuffer
+      // Reserves 4 bytes for compression scheme ID
+      .allocate(headerSize + 4 + compressedSize)
+      .order(ByteOrder.nativeOrder)
+      // Write the header
+      .putInt(nullCount)
+      .put(nulls)
+
+    logDebug(s"Compressor for [$columnName]: $encoder, ratio: ${encoder.compressionRatio}")
+    encoder.compress(nonNullBuffer, compressedBuffer)
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/014c0f7a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/compression/CompressionScheme.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/compression/CompressionScheme.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/compression/CompressionScheme.scala
new file mode 100644
index 0000000..920381f
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/compression/CompressionScheme.scala
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.columnar.compression
+
+import java.nio.{ByteBuffer, ByteOrder}
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.MutableRow
+import org.apache.spark.sql.execution.columnar.{ColumnType, NativeColumnType}
+import org.apache.spark.sql.types.AtomicType
+
+private[columnar] trait Encoder[T <: AtomicType] {
+  def gatherCompressibilityStats(row: InternalRow, ordinal: Int): Unit = {}
+
+  def compressedSize: Int
+
+  def uncompressedSize: Int
+
+  def compressionRatio: Double = {
+    if (uncompressedSize > 0) compressedSize.toDouble / uncompressedSize else 1.0
+  }
+
+  def compress(from: ByteBuffer, to: ByteBuffer): ByteBuffer
+}
+
+private[columnar] trait Decoder[T <: AtomicType] {
+  def next(row: MutableRow, ordinal: Int): Unit
+
+  def hasNext: Boolean
+}
+
+private[columnar] trait CompressionScheme {
+  def typeId: Int
+
+  def supports(columnType: ColumnType[_]): Boolean
+
+  def encoder[T <: AtomicType](columnType: NativeColumnType[T]): Encoder[T]
+
+  def decoder[T <: AtomicType](buffer: ByteBuffer, columnType: NativeColumnType[T]): Decoder[T]
+}
+
+private[columnar] trait WithCompressionSchemes {
+  def schemes: Seq[CompressionScheme]
+}
+
+private[columnar] trait AllCompressionSchemes extends WithCompressionSchemes {
+  override val schemes: Seq[CompressionScheme] = CompressionScheme.all
+}
+
+private[columnar] object CompressionScheme {
+  val all: Seq[CompressionScheme] =
+    Seq(PassThrough, RunLengthEncoding, DictionaryEncoding, BooleanBitSet, IntDelta, LongDelta)
+
+  private val typeIdToScheme = all.map(scheme => scheme.typeId -> scheme).toMap
+
+  def apply(typeId: Int): CompressionScheme = {
+    typeIdToScheme.getOrElse(typeId, throw new UnsupportedOperationException(
+      s"Unrecognized compression scheme type ID: $typeId"))
+  }
+
+  def columnHeaderSize(columnBuffer: ByteBuffer): Int = {
+    val header = columnBuffer.duplicate().order(ByteOrder.nativeOrder)
+    val nullCount = header.getInt()
+    // null count + null positions
+    4 + 4 * nullCount
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/014c0f7a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/compression/compressionSchemes.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/compression/compressionSchemes.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/compression/compressionSchemes.scala
new file mode 100644
index 0000000..941f03b
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/compression/compressionSchemes.scala
@@ -0,0 +1,532 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.columnar.compression
+
+import java.nio.ByteBuffer
+
+import scala.collection.mutable
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.{MutableRow, SpecificMutableRow}
+import org.apache.spark.sql.execution.columnar._
+import org.apache.spark.sql.types._
+
+
+private[columnar] case object PassThrough extends CompressionScheme {
+  override val typeId = 0
+
+  override def supports(columnType: ColumnType[_]): Boolean = true
+
+  override def encoder[T <: AtomicType](columnType: NativeColumnType[T]): Encoder[T] = {
+    new this.Encoder[T](columnType)
+  }
+
+  override def decoder[T <: AtomicType](
+      buffer: ByteBuffer, columnType: NativeColumnType[T]): Decoder[T] = {
+    new this.Decoder(buffer, columnType)
+  }
+
+  class Encoder[T <: AtomicType](columnType: NativeColumnType[T]) extends compression.Encoder[T] {
+    override def uncompressedSize: Int = 0
+
+    override def compressedSize: Int = 0
+
+    override def compress(from: ByteBuffer, to: ByteBuffer): ByteBuffer = {
+      // Writes compression type ID and copies raw contents
+      to.putInt(PassThrough.typeId).put(from).rewind()
+      to
+    }
+  }
+
+  class Decoder[T <: AtomicType](buffer: ByteBuffer, columnType: NativeColumnType[T])
+    extends compression.Decoder[T] {
+
+    override def next(row: MutableRow, ordinal: Int): Unit = {
+      columnType.extract(buffer, row, ordinal)
+    }
+
+    override def hasNext: Boolean = buffer.hasRemaining
+  }
+}
+
+private[columnar] case object RunLengthEncoding extends CompressionScheme {
+  override val typeId = 1
+
+  override def encoder[T <: AtomicType](columnType: NativeColumnType[T]): Encoder[T] = {
+    new this.Encoder[T](columnType)
+  }
+
+  override def decoder[T <: AtomicType](
+      buffer: ByteBuffer, columnType: NativeColumnType[T]): Decoder[T] = {
+    new this.Decoder(buffer, columnType)
+  }
+
+  override def supports(columnType: ColumnType[_]): Boolean = columnType match {
+    case INT | LONG | SHORT | BYTE | STRING | BOOLEAN => true
+    case _ => false
+  }
+
+  class Encoder[T <: AtomicType](columnType: NativeColumnType[T]) extends compression.Encoder[T] {
+    private var _uncompressedSize = 0
+    private var _compressedSize = 0
+
+    // Using `MutableRow` to store the last value to avoid boxing/unboxing cost.
+    private val lastValue = new SpecificMutableRow(Seq(columnType.dataType))
+    private var lastRun = 0
+
+    override def uncompressedSize: Int = _uncompressedSize
+
+    override def compressedSize: Int = _compressedSize
+
+    override def gatherCompressibilityStats(row: InternalRow, ordinal: Int): Unit = {
+      val value = columnType.getField(row, ordinal)
+      val actualSize = columnType.actualSize(row, ordinal)
+      _uncompressedSize += actualSize
+
+      if (lastValue.isNullAt(0)) {
+        columnType.copyField(row, ordinal, lastValue, 0)
+        lastRun = 1
+        _compressedSize += actualSize + 4
+      } else {
+        if (columnType.getField(lastValue, 0) == value) {
+          lastRun += 1
+        } else {
+          _compressedSize += actualSize + 4
+          columnType.copyField(row, ordinal, lastValue, 0)
+          lastRun = 1
+        }
+      }
+    }
+
+    override def compress(from: ByteBuffer, to: ByteBuffer): ByteBuffer = {
+      to.putInt(RunLengthEncoding.typeId)
+
+      if (from.hasRemaining) {
+        val currentValue = new SpecificMutableRow(Seq(columnType.dataType))
+        var currentRun = 1
+        val value = new SpecificMutableRow(Seq(columnType.dataType))
+
+        columnType.extract(from, currentValue, 0)
+
+        while (from.hasRemaining) {
+          columnType.extract(from, value, 0)
+
+          if (value.get(0, columnType.dataType) == currentValue.get(0, columnType.dataType)) {
+            currentRun += 1
+          } else {
+            // Writes current run
+            columnType.append(currentValue, 0, to)
+            to.putInt(currentRun)
+
+            // Resets current run
+            columnType.copyField(value, 0, currentValue, 0)
+            currentRun = 1
+          }
+        }
+
+        // Writes the last run
+        columnType.append(currentValue, 0, to)
+        to.putInt(currentRun)
+      }
+
+      to.rewind()
+      to
+    }
+  }
+
+  class Decoder[T <: AtomicType](buffer: ByteBuffer, columnType: NativeColumnType[T])
+    extends compression.Decoder[T] {
+
+    private var run = 0
+    private var valueCount = 0
+    private var currentValue: T#InternalType = _
+
+    override def next(row: MutableRow, ordinal: Int): Unit = {
+      if (valueCount == run) {
+        currentValue = columnType.extract(buffer)
+        run = ByteBufferHelper.getInt(buffer)
+        valueCount = 1
+      } else {
+        valueCount += 1
+      }
+
+      columnType.setField(row, ordinal, currentValue)
+    }
+
+    override def hasNext: Boolean = valueCount < run || buffer.hasRemaining
+  }
+}
+
+private[columnar] case object DictionaryEncoding extends CompressionScheme {
+  override val typeId = 2
+
+  // 32K unique values allowed
+  val MAX_DICT_SIZE = Short.MaxValue
+
+  override def decoder[T <: AtomicType](buffer: ByteBuffer, columnType: NativeColumnType[T])
+    : Decoder[T] = {
+    new this.Decoder(buffer, columnType)
+  }
+
+  override def encoder[T <: AtomicType](columnType: NativeColumnType[T]): Encoder[T] = {
+    new this.Encoder[T](columnType)
+  }
+
+  override def supports(columnType: ColumnType[_]): Boolean = columnType match {
+    case INT | LONG | STRING => true
+    case _ => false
+  }
+
+  class Encoder[T <: AtomicType](columnType: NativeColumnType[T]) extends compression.Encoder[T] {
+    // Size of the input, uncompressed, in bytes. Note that we only count until the dictionary
+    // overflows.
+    private var _uncompressedSize = 0
+
+    // If the number of distinct elements is too large, we discard the use of dictionary encoding
+    // and set the overflow flag to true.
+    private var overflow = false
+
+    // Total number of elements.
+    private var count = 0
+
+    // The reverse mapping of _dictionary, i.e. mapping encoded integer to the value itself.
+    private var values = new mutable.ArrayBuffer[T#InternalType](1024)
+
+    // The dictionary that maps a value to the encoded short integer.
+    private val dictionary = mutable.HashMap.empty[Any, Short]
+
+    // Size of the serialized dictionary in bytes. Initialized to 4 since we need at least an `Int`
+    // to store dictionary element count.
+    private var dictionarySize = 4
+
+    override def gatherCompressibilityStats(row: InternalRow, ordinal: Int): Unit = {
+      val value = columnType.getField(row, ordinal)
+
+      if (!overflow) {
+        val actualSize = columnType.actualSize(row, ordinal)
+        count += 1
+        _uncompressedSize += actualSize
+
+        if (!dictionary.contains(value)) {
+          if (dictionary.size < MAX_DICT_SIZE) {
+            val clone = columnType.clone(value)
+            values += clone
+            dictionarySize += actualSize
+            dictionary(clone) = dictionary.size.toShort
+          } else {
+            overflow = true
+            values.clear()
+            dictionary.clear()
+          }
+        }
+      }
+    }
+
+    override def compress(from: ByteBuffer, to: ByteBuffer): ByteBuffer = {
+      if (overflow) {
+        throw new IllegalStateException(
+          "Dictionary encoding should not be used because of dictionary overflow.")
+      }
+
+      to.putInt(DictionaryEncoding.typeId)
+        .putInt(dictionary.size)
+
+      var i = 0
+      while (i < values.length) {
+        columnType.append(values(i), to)
+        i += 1
+      }
+
+      while (from.hasRemaining) {
+        to.putShort(dictionary(columnType.extract(from)))
+      }
+
+      to.rewind()
+      to
+    }
+
+    override def uncompressedSize: Int = _uncompressedSize
+
+    override def compressedSize: Int = if (overflow) Int.MaxValue else dictionarySize + count * 2
+  }
+
+  class Decoder[T <: AtomicType](buffer: ByteBuffer, columnType: NativeColumnType[T])
+    extends compression.Decoder[T] {
+
+    private val dictionary: Array[Any] = {
+      val elementNum = ByteBufferHelper.getInt(buffer)
+      Array.fill[Any](elementNum)(columnType.extract(buffer).asInstanceOf[Any])
+    }
+
+    override def next(row: MutableRow, ordinal: Int): Unit = {
+      columnType.setField(row, ordinal, dictionary(buffer.getShort()).asInstanceOf[T#InternalType])
+    }
+
+    override def hasNext: Boolean = buffer.hasRemaining
+  }
+}
+
+private[columnar] case object BooleanBitSet extends CompressionScheme {
+  override val typeId = 3
+
+  val BITS_PER_LONG = 64
+
+  override def decoder[T <: AtomicType](buffer: ByteBuffer, columnType: NativeColumnType[T])
+    : compression.Decoder[T] = {
+    new this.Decoder(buffer).asInstanceOf[compression.Decoder[T]]
+  }
+
+  override def encoder[T <: AtomicType](columnType: NativeColumnType[T]): compression.Encoder[T] = {
+    (new this.Encoder).asInstanceOf[compression.Encoder[T]]
+  }
+
+  override def supports(columnType: ColumnType[_]): Boolean = columnType == BOOLEAN
+
+  class Encoder extends compression.Encoder[BooleanType.type] {
+    private var _uncompressedSize = 0
+
+    override def gatherCompressibilityStats(row: InternalRow, ordinal: Int): Unit = {
+      _uncompressedSize += BOOLEAN.defaultSize
+    }
+
+    override def compress(from: ByteBuffer, to: ByteBuffer): ByteBuffer = {
+      to.putInt(BooleanBitSet.typeId)
+        // Total element count (1 byte per Boolean value)
+        .putInt(from.remaining)
+
+      while (from.remaining >= BITS_PER_LONG) {
+        var word = 0: Long
+        var i = 0
+
+        while (i < BITS_PER_LONG) {
+          if (BOOLEAN.extract(from)) {
+            word |= (1: Long) << i
+          }
+          i += 1
+        }
+
+        to.putLong(word)
+      }
+
+      if (from.hasRemaining) {
+        var word = 0: Long
+        var i = 0
+
+        while (from.hasRemaining) {
+          if (BOOLEAN.extract(from)) {
+            word |= (1: Long) << i
+          }
+          i += 1
+        }
+
+        to.putLong(word)
+      }
+
+      to.rewind()
+      to
+    }
+
+    override def uncompressedSize: Int = _uncompressedSize
+
+    override def compressedSize: Int = {
+      val extra = if (_uncompressedSize % BITS_PER_LONG == 0) 0 else 1
+      (_uncompressedSize / BITS_PER_LONG + extra) * 8 + 4
+    }
+  }
+
+  class Decoder(buffer: ByteBuffer) extends compression.Decoder[BooleanType.type] {
+    private val count = ByteBufferHelper.getInt(buffer)
+
+    private var currentWord = 0: Long
+
+    private var visited: Int = 0
+
+    override def next(row: MutableRow, ordinal: Int): Unit = {
+      val bit = visited % BITS_PER_LONG
+
+      visited += 1
+      if (bit == 0) {
+        currentWord = ByteBufferHelper.getLong(buffer)
+      }
+
+      row.setBoolean(ordinal, ((currentWord >> bit) & 1) != 0)
+    }
+
+    override def hasNext: Boolean = visited < count
+  }
+}
+
+private[columnar] case object IntDelta extends CompressionScheme {
+  override def typeId: Int = 4
+
+  override def decoder[T <: AtomicType](buffer: ByteBuffer, columnType: NativeColumnType[T])
+    : compression.Decoder[T] = {
+    new Decoder(buffer, INT).asInstanceOf[compression.Decoder[T]]
+  }
+
+  override def encoder[T <: AtomicType](columnType: NativeColumnType[T]): compression.Encoder[T] = {
+    (new Encoder).asInstanceOf[compression.Encoder[T]]
+  }
+
+  override def supports(columnType: ColumnType[_]): Boolean = columnType == INT
+
+  class Encoder extends compression.Encoder[IntegerType.type] {
+    protected var _compressedSize: Int = 0
+    protected var _uncompressedSize: Int = 0
+
+    override def compressedSize: Int = _compressedSize
+    override def uncompressedSize: Int = _uncompressedSize
+
+    private var prevValue: Int = _
+
+    override def gatherCompressibilityStats(row: InternalRow, ordinal: Int): Unit = {
+      val value = row.getInt(ordinal)
+      val delta = value - prevValue
+
+      _compressedSize += 1
+
+      // If this is the first integer to be compressed, or the delta is out of byte range, then give
+      // up compressing this integer.
+      if (_uncompressedSize == 0 || delta <= Byte.MinValue || delta > Byte.MaxValue) {
+        _compressedSize += INT.defaultSize
+      }
+
+      _uncompressedSize += INT.defaultSize
+      prevValue = value
+    }
+
+    override def compress(from: ByteBuffer, to: ByteBuffer): ByteBuffer = {
+      to.putInt(typeId)
+
+      if (from.hasRemaining) {
+        var prev = from.getInt()
+        to.put(Byte.MinValue)
+        to.putInt(prev)
+
+        while (from.hasRemaining) {
+          val current = from.getInt()
+          val delta = current - prev
+          prev = current
+
+          if (Byte.MinValue < delta && delta <= Byte.MaxValue) {
+            to.put(delta.toByte)
+          } else {
+            to.put(Byte.MinValue)
+            to.putInt(current)
+          }
+        }
+      }
+
+      to.rewind().asInstanceOf[ByteBuffer]
+    }
+  }
+
+  class Decoder(buffer: ByteBuffer, columnType: NativeColumnType[IntegerType.type])
+    extends compression.Decoder[IntegerType.type] {
+
+    private var prev: Int = _
+
+    override def hasNext: Boolean = buffer.hasRemaining
+
+    override def next(row: MutableRow, ordinal: Int): Unit = {
+      val delta = buffer.get()
+      prev = if (delta > Byte.MinValue) prev + delta else ByteBufferHelper.getInt(buffer)
+      row.setInt(ordinal, prev)
+    }
+  }
+}
+
+private[columnar] case object LongDelta extends CompressionScheme {
+  override def typeId: Int = 5
+
+  override def decoder[T <: AtomicType](buffer: ByteBuffer, columnType: NativeColumnType[T])
+    : compression.Decoder[T] = {
+    new Decoder(buffer, LONG).asInstanceOf[compression.Decoder[T]]
+  }
+
+  override def encoder[T <: AtomicType](columnType: NativeColumnType[T]): compression.Encoder[T] = {
+    (new Encoder).asInstanceOf[compression.Encoder[T]]
+  }
+
+  override def supports(columnType: ColumnType[_]): Boolean = columnType == LONG
+
+  class Encoder extends compression.Encoder[LongType.type] {
+    protected var _compressedSize: Int = 0
+    protected var _uncompressedSize: Int = 0
+
+    override def compressedSize: Int = _compressedSize
+    override def uncompressedSize: Int = _uncompressedSize
+
+    private var prevValue: Long = _
+
+    override def gatherCompressibilityStats(row: InternalRow, ordinal: Int): Unit = {
+      val value = row.getLong(ordinal)
+      val delta = value - prevValue
+
+      _compressedSize += 1
+
+      // If this is the first long integer to be compressed, or the delta is out of byte range, then
+      // give up compressing this long integer.
+      if (_uncompressedSize == 0 || delta <= Byte.MinValue || delta > Byte.MaxValue) {
+        _compressedSize += LONG.defaultSize
+      }
+
+      _uncompressedSize += LONG.defaultSize
+      prevValue = value
+    }
+
+    override def compress(from: ByteBuffer, to: ByteBuffer): ByteBuffer = {
+      to.putInt(typeId)
+
+      if (from.hasRemaining) {
+        var prev = from.getLong()
+        to.put(Byte.MinValue)
+        to.putLong(prev)
+
+        while (from.hasRemaining) {
+          val current = from.getLong()
+          val delta = current - prev
+          prev = current
+
+          if (Byte.MinValue < delta && delta <= Byte.MaxValue) {
+            to.put(delta.toByte)
+          } else {
+            to.put(Byte.MinValue)
+            to.putLong(current)
+          }
+        }
+      }
+
+      to.rewind().asInstanceOf[ByteBuffer]
+    }
+  }
+
+  class Decoder(buffer: ByteBuffer, columnType: NativeColumnType[LongType.type])
+    extends compression.Decoder[LongType.type] {
+
+    private var prev: Long = _
+
+    override def hasNext: Boolean = buffer.hasRemaining
+
+    override def next(row: MutableRow, ordinal: Int): Unit = {
+      val delta = buffer.get()
+      prev = if (delta > Byte.MinValue) prev + delta else ByteBufferHelper.getLong(buffer)
+      row.setLong(ordinal, prev)
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/014c0f7a/sql/core/src/main/scala/org/apache/spark/sql/execution/package.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/package.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/package.scala
index 28fa231..c912734 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/package.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/package.scala
@@ -19,5 +19,7 @@ package org.apache.spark.sql
 
 /**
  * The physical execution component of Spark SQL. Note that this is a private package.
+ * All classes in catalyst are considered an internal API to Spark SQL and are subject
+ * to change between minor releases.
  */
 package object execution

http://git-wip-us.apache.org/repos/asf/spark/blob/014c0f7a/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala
index bce94da..d86df4c 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala
@@ -27,7 +27,7 @@ import scala.language.postfixOps
 import org.scalatest.concurrent.Eventually._
 
 import org.apache.spark.Accumulators
-import org.apache.spark.sql.columnar._
+import org.apache.spark.sql.execution.columnar._
 import org.apache.spark.sql.functions._
 import org.apache.spark.sql.test.{SQLTestUtils, SharedSQLContext}
 import org.apache.spark.storage.{StorageLevel, RDDBlockId}
@@ -280,7 +280,7 @@ class CachedTableSuite extends QueryTest with SQLTestUtils with SharedSQLContext
     sql("CACHE TABLE testData")
     sqlContext.table("testData").queryExecution.withCachedData.collect {
       case cached: InMemoryRelation =>
-        val actualSizeInBytes = (1 to 100).map(i => INT.defaultSize + i.toString.length + 4).sum
+        val actualSizeInBytes = (1 to 100).map(i => 4 + i.toString.length + 4).sum
         assert(cached.statistics.sizeInBytes === actualSizeInBytes)
     }
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/014c0f7a/sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala b/sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala
index b5417b1..6ea1fe4 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala
@@ -23,7 +23,7 @@ import scala.collection.JavaConverters._
 
 import org.apache.spark.sql.catalyst.plans._
 import org.apache.spark.sql.catalyst.util._
-import org.apache.spark.sql.columnar.InMemoryRelation
+import org.apache.spark.sql.execution.columnar.InMemoryRelation
 
 abstract class QueryTest extends PlanTest {
 

http://git-wip-us.apache.org/repos/asf/spark/blob/014c0f7a/sql/core/src/test/scala/org/apache/spark/sql/columnar/ColumnStatsSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/columnar/ColumnStatsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/columnar/ColumnStatsSuite.scala
deleted file mode 100644
index 89a6640..0000000
--- a/sql/core/src/test/scala/org/apache/spark/sql/columnar/ColumnStatsSuite.scala
+++ /dev/null
@@ -1,110 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.columnar
-
-import org.apache.spark.SparkFunSuite
-import org.apache.spark.sql.catalyst.expressions.GenericInternalRow
-import org.apache.spark.sql.types._
-
-class ColumnStatsSuite extends SparkFunSuite {
-  testColumnStats(classOf[BooleanColumnStats], BOOLEAN, createRow(true, false, 0))
-  testColumnStats(classOf[ByteColumnStats], BYTE, createRow(Byte.MaxValue, Byte.MinValue, 0))
-  testColumnStats(classOf[ShortColumnStats], SHORT, createRow(Short.MaxValue, Short.MinValue, 0))
-  testColumnStats(classOf[IntColumnStats], INT, createRow(Int.MaxValue, Int.MinValue, 0))
-  testColumnStats(classOf[LongColumnStats], LONG, createRow(Long.MaxValue, Long.MinValue, 0))
-  testColumnStats(classOf[FloatColumnStats], FLOAT, createRow(Float.MaxValue, Float.MinValue, 0))
-  testColumnStats(classOf[DoubleColumnStats], DOUBLE,
-    createRow(Double.MaxValue, Double.MinValue, 0))
-  testColumnStats(classOf[StringColumnStats], STRING, createRow(null, null, 0))
-  testDecimalColumnStats(createRow(null, null, 0))
-
-  def createRow(values: Any*): GenericInternalRow = new GenericInternalRow(values.toArray)
-
-  def testColumnStats[T <: AtomicType, U <: ColumnStats](
-      columnStatsClass: Class[U],
-      columnType: NativeColumnType[T],
-      initialStatistics: GenericInternalRow): Unit = {
-
-    val columnStatsName = columnStatsClass.getSimpleName
-
-    test(s"$columnStatsName: empty") {
-      val columnStats = columnStatsClass.newInstance()
-      columnStats.collectedStatistics.values.zip(initialStatistics.values).foreach {
-        case (actual, expected) => assert(actual === expected)
-      }
-    }
-
-    test(s"$columnStatsName: non-empty") {
-      import org.apache.spark.sql.columnar.ColumnarTestUtils._
-
-      val columnStats = columnStatsClass.newInstance()
-      val rows = Seq.fill(10)(makeRandomRow(columnType)) ++ Seq.fill(10)(makeNullRow(1))
-      rows.foreach(columnStats.gatherStats(_, 0))
-
-      val values = rows.take(10).map(_.get(0, columnType.dataType).asInstanceOf[T#InternalType])
-      val ordering = columnType.dataType.ordering.asInstanceOf[Ordering[T#InternalType]]
-      val stats = columnStats.collectedStatistics
-
-      assertResult(values.min(ordering), "Wrong lower bound")(stats.values(0))
-      assertResult(values.max(ordering), "Wrong upper bound")(stats.values(1))
-      assertResult(10, "Wrong null count")(stats.values(2))
-      assertResult(20, "Wrong row count")(stats.values(3))
-      assertResult(stats.values(4), "Wrong size in bytes") {
-        rows.map { row =>
-          if (row.isNullAt(0)) 4 else columnType.actualSize(row, 0)
-        }.sum
-      }
-    }
-  }
-
-  def testDecimalColumnStats[T <: AtomicType, U <: ColumnStats](
-      initialStatistics: GenericInternalRow): Unit = {
-
-    val columnStatsName = classOf[DecimalColumnStats].getSimpleName
-    val columnType = COMPACT_DECIMAL(15, 10)
-
-    test(s"$columnStatsName: empty") {
-      val columnStats = new DecimalColumnStats(15, 10)
-      columnStats.collectedStatistics.values.zip(initialStatistics.values).foreach {
-        case (actual, expected) => assert(actual === expected)
-      }
-    }
-
-    test(s"$columnStatsName: non-empty") {
-      import org.apache.spark.sql.columnar.ColumnarTestUtils._
-
-      val columnStats = new DecimalColumnStats(15, 10)
-      val rows = Seq.fill(10)(makeRandomRow(columnType)) ++ Seq.fill(10)(makeNullRow(1))
-      rows.foreach(columnStats.gatherStats(_, 0))
-
-      val values = rows.take(10).map(_.get(0, columnType.dataType).asInstanceOf[T#InternalType])
-      val ordering = columnType.dataType.ordering.asInstanceOf[Ordering[T#InternalType]]
-      val stats = columnStats.collectedStatistics
-
-      assertResult(values.min(ordering), "Wrong lower bound")(stats.values(0))
-      assertResult(values.max(ordering), "Wrong upper bound")(stats.values(1))
-      assertResult(10, "Wrong null count")(stats.values(2))
-      assertResult(20, "Wrong row count")(stats.values(3))
-      assertResult(stats.values(4), "Wrong size in bytes") {
-        rows.map { row =>
-          if (row.isNullAt(0)) 4 else columnType.actualSize(row, 0)
-        }.sum
-      }
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/014c0f7a/sql/core/src/test/scala/org/apache/spark/sql/columnar/ColumnTypeSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/columnar/ColumnTypeSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/columnar/ColumnTypeSuite.scala
deleted file mode 100644
index 63bc39b..0000000
--- a/sql/core/src/test/scala/org/apache/spark/sql/columnar/ColumnTypeSuite.scala
+++ /dev/null
@@ -1,145 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.columnar
-
-import java.nio.{ByteOrder, ByteBuffer}
-
-import org.apache.spark.sql.Row
-import org.apache.spark.sql.catalyst.CatalystTypeConverters
-import org.apache.spark.sql.catalyst.expressions.{UnsafeProjection, GenericMutableRow}
-import org.apache.spark.sql.columnar.ColumnarTestUtils._
-import org.apache.spark.sql.types._
-import org.apache.spark.{Logging, SparkFunSuite}
-
-
-class ColumnTypeSuite extends SparkFunSuite with Logging {
-  private val DEFAULT_BUFFER_SIZE = 512
-  private val MAP_TYPE = MAP(MapType(IntegerType, StringType))
-  private val ARRAY_TYPE = ARRAY(ArrayType(IntegerType))
-  private val STRUCT_TYPE = STRUCT(StructType(StructField("a", StringType) :: Nil))
-
-  test("defaultSize") {
-    val checks = Map(
-      NULL-> 0, BOOLEAN -> 1, BYTE -> 1, SHORT -> 2, INT -> 4, LONG -> 8,
-      FLOAT -> 4, DOUBLE -> 8, COMPACT_DECIMAL(15, 10) -> 8, LARGE_DECIMAL(20, 10) -> 12,
-      STRING -> 8, BINARY -> 16, STRUCT_TYPE -> 20, ARRAY_TYPE -> 16, MAP_TYPE -> 32)
-
-    checks.foreach { case (columnType, expectedSize) =>
-      assertResult(expectedSize, s"Wrong defaultSize for $columnType") {
-        columnType.defaultSize
-      }
-    }
-  }
-
-  test("actualSize") {
-    def checkActualSize(
-        columnType: ColumnType[_],
-        value: Any,
-        expected: Int): Unit = {
-
-      assertResult(expected, s"Wrong actualSize for $columnType") {
-        val row = new GenericMutableRow(1)
-        row.update(0, CatalystTypeConverters.convertToCatalyst(value))
-        val proj = UnsafeProjection.create(Array[DataType](columnType.dataType))
-        columnType.actualSize(proj(row), 0)
-      }
-    }
-
-    checkActualSize(NULL, null, 0)
-    checkActualSize(BOOLEAN, true, 1)
-    checkActualSize(BYTE, Byte.MaxValue, 1)
-    checkActualSize(SHORT, Short.MaxValue, 2)
-    checkActualSize(INT, Int.MaxValue, 4)
-    checkActualSize(LONG, Long.MaxValue, 8)
-    checkActualSize(FLOAT, Float.MaxValue, 4)
-    checkActualSize(DOUBLE, Double.MaxValue, 8)
-    checkActualSize(STRING, "hello", 4 + "hello".getBytes("utf-8").length)
-    checkActualSize(BINARY, Array.fill[Byte](4)(0.toByte), 4 + 4)
-    checkActualSize(COMPACT_DECIMAL(15, 10), Decimal(0, 15, 10), 8)
-    checkActualSize(LARGE_DECIMAL(20, 10), Decimal(0, 20, 10), 5)
-    checkActualSize(ARRAY_TYPE, Array[Any](1), 16)
-    checkActualSize(MAP_TYPE, Map(1 -> "a"), 29)
-    checkActualSize(STRUCT_TYPE, Row("hello"), 28)
-  }
-
-  testNativeColumnType(BOOLEAN)
-  testNativeColumnType(BYTE)
-  testNativeColumnType(SHORT)
-  testNativeColumnType(INT)
-  testNativeColumnType(LONG)
-  testNativeColumnType(FLOAT)
-  testNativeColumnType(DOUBLE)
-  testNativeColumnType(COMPACT_DECIMAL(15, 10))
-  testNativeColumnType(STRING)
-
-  testColumnType(NULL)
-  testColumnType(BINARY)
-  testColumnType(LARGE_DECIMAL(20, 10))
-  testColumnType(STRUCT_TYPE)
-  testColumnType(ARRAY_TYPE)
-  testColumnType(MAP_TYPE)
-
-  def testNativeColumnType[T <: AtomicType](columnType: NativeColumnType[T]): Unit = {
-    testColumnType[T#InternalType](columnType)
-  }
-
-  def testColumnType[JvmType](columnType: ColumnType[JvmType]): Unit = {
-
-    val buffer = ByteBuffer.allocate(DEFAULT_BUFFER_SIZE).order(ByteOrder.nativeOrder())
-    val proj = UnsafeProjection.create(Array[DataType](columnType.dataType))
-    val converter = CatalystTypeConverters.createToScalaConverter(columnType.dataType)
-    val seq = (0 until 4).map(_ => proj(makeRandomRow(columnType)).copy())
-
-    test(s"$columnType append/extract") {
-      buffer.rewind()
-      seq.foreach(columnType.append(_, 0, buffer))
-
-      buffer.rewind()
-      seq.foreach { row =>
-        logInfo("buffer = " + buffer + ", expected = " + row)
-        val expected = converter(row.get(0, columnType.dataType))
-        val extracted = converter(columnType.extract(buffer))
-        assert(expected === extracted,
-          s"Extracted value didn't equal to the original one. $expected != $extracted, buffer =" +
-          dumpBuffer(buffer.duplicate().rewind().asInstanceOf[ByteBuffer]))
-      }
-    }
-  }
-
-  private def dumpBuffer(buff: ByteBuffer): Any = {
-    val sb = new StringBuilder()
-    while (buff.hasRemaining) {
-      val b = buff.get()
-      sb.append(Integer.toHexString(b & 0xff)).append(' ')
-    }
-    if (sb.nonEmpty) sb.setLength(sb.length - 1)
-    sb.toString()
-  }
-
-  test("column type for decimal types with different precision") {
-    (1 to 18).foreach { i =>
-      assertResult(COMPACT_DECIMAL(i, 0)) {
-        ColumnType(DecimalType(i, 0))
-      }
-    }
-
-    assertResult(LARGE_DECIMAL(19, 0)) {
-      ColumnType(DecimalType(19, 0))
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/014c0f7a/sql/core/src/test/scala/org/apache/spark/sql/columnar/ColumnarTestUtils.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/columnar/ColumnarTestUtils.scala b/sql/core/src/test/scala/org/apache/spark/sql/columnar/ColumnarTestUtils.scala
deleted file mode 100644
index a5882f7..0000000
--- a/sql/core/src/test/scala/org/apache/spark/sql/columnar/ColumnarTestUtils.scala
+++ /dev/null
@@ -1,108 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.columnar
-
-import scala.collection.immutable.HashSet
-import scala.util.Random
-
-import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.expressions.{GenericInternalRow, GenericMutableRow}
-import org.apache.spark.sql.catalyst.util.{GenericArrayData, ArrayBasedMapData}
-import org.apache.spark.sql.types.{AtomicType, Decimal}
-import org.apache.spark.unsafe.types.UTF8String
-
-object ColumnarTestUtils {
-  def makeNullRow(length: Int): GenericMutableRow = {
-    val row = new GenericMutableRow(length)
-    (0 until length).foreach(row.setNullAt)
-    row
-  }
-
-  def makeRandomValue[JvmType](columnType: ColumnType[JvmType]): JvmType = {
-    def randomBytes(length: Int) = {
-      val bytes = new Array[Byte](length)
-      Random.nextBytes(bytes)
-      bytes
-    }
-
-    (columnType match {
-      case NULL => null
-      case BOOLEAN => Random.nextBoolean()
-      case BYTE => (Random.nextInt(Byte.MaxValue * 2) - Byte.MaxValue).toByte
-      case SHORT => (Random.nextInt(Short.MaxValue * 2) - Short.MaxValue).toShort
-      case INT => Random.nextInt()
-      case LONG => Random.nextLong()
-      case FLOAT => Random.nextFloat()
-      case DOUBLE => Random.nextDouble()
-      case STRING => UTF8String.fromString(Random.nextString(Random.nextInt(32)))
-      case BINARY => randomBytes(Random.nextInt(32))
-      case COMPACT_DECIMAL(precision, scale) => Decimal(Random.nextLong() % 100, precision, scale)
-      case LARGE_DECIMAL(precision, scale) => Decimal(Random.nextLong(), precision, scale)
-      case STRUCT(_) =>
-        new GenericInternalRow(Array[Any](UTF8String.fromString(Random.nextString(10))))
-      case ARRAY(_) =>
-        new GenericArrayData(Array[Any](Random.nextInt(), Random.nextInt()))
-      case MAP(_) =>
-        ArrayBasedMapData(
-          Map(Random.nextInt() -> UTF8String.fromString(Random.nextString(Random.nextInt(32)))))
-    }).asInstanceOf[JvmType]
-  }
-
-  def makeRandomValues(
-      head: ColumnType[_],
-      tail: ColumnType[_]*): Seq[Any] = makeRandomValues(Seq(head) ++ tail)
-
-  def makeRandomValues(columnTypes: Seq[ColumnType[_]]): Seq[Any] = {
-    columnTypes.map(makeRandomValue(_))
-  }
-
-  def makeUniqueRandomValues[JvmType](
-      columnType: ColumnType[JvmType],
-      count: Int): Seq[JvmType] = {
-
-    Iterator.iterate(HashSet.empty[JvmType]) { set =>
-      set + Iterator.continually(makeRandomValue(columnType)).filterNot(set.contains).next()
-    }.drop(count).next().toSeq
-  }
-
-  def makeRandomRow(
-      head: ColumnType[_],
-      tail: ColumnType[_]*): InternalRow = makeRandomRow(Seq(head) ++ tail)
-
-  def makeRandomRow(columnTypes: Seq[ColumnType[_]]): InternalRow = {
-    val row = new GenericMutableRow(columnTypes.length)
-    makeRandomValues(columnTypes).zipWithIndex.foreach { case (value, index) =>
-      row(index) = value
-    }
-    row
-  }
-
-  def makeUniqueValuesAndSingleValueRows[T <: AtomicType](
-      columnType: NativeColumnType[T],
-      count: Int): (Seq[T#InternalType], Seq[GenericMutableRow]) = {
-
-    val values = makeUniqueRandomValues(columnType, count)
-    val rows = values.map { value =>
-      val row = new GenericMutableRow(1)
-      row(0) = value
-      row
-    }
-
-    (values, rows)
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/014c0f7a/sql/core/src/test/scala/org/apache/spark/sql/columnar/InMemoryColumnarQuerySuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/columnar/InMemoryColumnarQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/columnar/InMemoryColumnarQuerySuite.scala
deleted file mode 100644
index 6265e40..0000000
--- a/sql/core/src/test/scala/org/apache/spark/sql/columnar/InMemoryColumnarQuerySuite.scala
+++ /dev/null
@@ -1,222 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.columnar
-
-import java.sql.{Date, Timestamp}
-
-import org.apache.spark.sql.{QueryTest, Row}
-import org.apache.spark.sql.test.SharedSQLContext
-import org.apache.spark.sql.test.SQLTestData._
-import org.apache.spark.sql.types._
-import org.apache.spark.storage.StorageLevel.MEMORY_ONLY
-
-class InMemoryColumnarQuerySuite extends QueryTest with SharedSQLContext {
-  import testImplicits._
-
-  setupTestData()
-
-  test("simple columnar query") {
-    val plan = sqlContext.executePlan(testData.logicalPlan).executedPlan
-    val scan = InMemoryRelation(useCompression = true, 5, MEMORY_ONLY, plan, None)
-
-    checkAnswer(scan, testData.collect().toSeq)
-  }
-
-  test("default size avoids broadcast") {
-    // TODO: Improve this test when we have better statistics
-    sparkContext.parallelize(1 to 10).map(i => TestData(i, i.toString))
-      .toDF().registerTempTable("sizeTst")
-    sqlContext.cacheTable("sizeTst")
-    assert(
-      sqlContext.table("sizeTst").queryExecution.analyzed.statistics.sizeInBytes >
-        sqlContext.conf.autoBroadcastJoinThreshold)
-  }
-
-  test("projection") {
-    val plan = sqlContext.executePlan(testData.select('value, 'key).logicalPlan).executedPlan
-    val scan = InMemoryRelation(useCompression = true, 5, MEMORY_ONLY, plan, None)
-
-    checkAnswer(scan, testData.collect().map {
-      case Row(key: Int, value: String) => value -> key
-    }.map(Row.fromTuple))
-  }
-
-  test("SPARK-1436 regression: in-memory columns must be able to be accessed multiple times") {
-    val plan = sqlContext.executePlan(testData.logicalPlan).executedPlan
-    val scan = InMemoryRelation(useCompression = true, 5, MEMORY_ONLY, plan, None)
-
-    checkAnswer(scan, testData.collect().toSeq)
-    checkAnswer(scan, testData.collect().toSeq)
-  }
-
-  test("SPARK-1678 regression: compression must not lose repeated values") {
-    checkAnswer(
-      sql("SELECT * FROM repeatedData"),
-      repeatedData.collect().toSeq.map(Row.fromTuple))
-
-    sqlContext.cacheTable("repeatedData")
-
-    checkAnswer(
-      sql("SELECT * FROM repeatedData"),
-      repeatedData.collect().toSeq.map(Row.fromTuple))
-  }
-
-  test("with null values") {
-    checkAnswer(
-      sql("SELECT * FROM nullableRepeatedData"),
-      nullableRepeatedData.collect().toSeq.map(Row.fromTuple))
-
-    sqlContext.cacheTable("nullableRepeatedData")
-
-    checkAnswer(
-      sql("SELECT * FROM nullableRepeatedData"),
-      nullableRepeatedData.collect().toSeq.map(Row.fromTuple))
-  }
-
-  test("SPARK-2729 regression: timestamp data type") {
-    val timestamps = (0 to 3).map(i => Tuple1(new Timestamp(i))).toDF("time")
-    timestamps.registerTempTable("timestamps")
-
-    checkAnswer(
-      sql("SELECT time FROM timestamps"),
-      timestamps.collect().toSeq)
-
-    sqlContext.cacheTable("timestamps")
-
-    checkAnswer(
-      sql("SELECT time FROM timestamps"),
-      timestamps.collect().toSeq)
-  }
-
-  test("SPARK-3320 regression: batched column buffer building should work with empty partitions") {
-    checkAnswer(
-      sql("SELECT * FROM withEmptyParts"),
-      withEmptyParts.collect().toSeq.map(Row.fromTuple))
-
-    sqlContext.cacheTable("withEmptyParts")
-
-    checkAnswer(
-      sql("SELECT * FROM withEmptyParts"),
-      withEmptyParts.collect().toSeq.map(Row.fromTuple))
-  }
-
-  test("SPARK-4182 Caching complex types") {
-    complexData.cache().count()
-    // Shouldn't throw
-    complexData.count()
-    complexData.unpersist()
-  }
-
-  test("decimal type") {
-    // Casting is required here because ScalaReflection can't capture decimal precision information.
-    val df = (1 to 10)
-      .map(i => Tuple1(Decimal(i, 15, 10)))
-      .toDF("dec")
-      .select($"dec" cast DecimalType(15, 10))
-
-    assert(df.schema.head.dataType === DecimalType(15, 10))
-
-    df.cache().registerTempTable("test_fixed_decimal")
-    checkAnswer(
-      sql("SELECT * FROM test_fixed_decimal"),
-      (1 to 10).map(i => Row(Decimal(i, 15, 10).toJavaBigDecimal)))
-  }
-
-  test("test different data types") {
-    // Create the schema.
-    val struct =
-      StructType(
-        StructField("f1", FloatType, true) ::
-        StructField("f2", ArrayType(BooleanType), true) :: Nil)
-    val dataTypes =
-      Seq(StringType, BinaryType, NullType, BooleanType,
-        ByteType, ShortType, IntegerType, LongType,
-        FloatType, DoubleType, DecimalType(25, 5), DecimalType(6, 5),
-        DateType, TimestampType,
-        ArrayType(IntegerType), MapType(StringType, LongType), struct)
-    val fields = dataTypes.zipWithIndex.map { case (dataType, index) =>
-      StructField(s"col$index", dataType, true)
-    }
-    val allColumns = fields.map(_.name).mkString(",")
-    val schema = StructType(fields)
-
-    // Create a RDD for the schema
-    val rdd =
-      sparkContext.parallelize((1 to 10000), 10).map { i =>
-        Row(
-          s"str${i}: test cache.",
-          s"binary${i}: test cache.".getBytes("UTF-8"),
-          null,
-          i % 2 == 0,
-          i.toByte,
-          i.toShort,
-          i,
-          Long.MaxValue - i.toLong,
-          (i + 0.25).toFloat,
-          (i + 0.75),
-          BigDecimal(Long.MaxValue.toString + ".12345"),
-          new java.math.BigDecimal(s"${i % 9 + 1}" + ".23456"),
-          new Date(i),
-          new Timestamp(i * 1000000L),
-          (i to i + 10).toSeq,
-          (i to i + 10).map(j => s"map_key_$j" -> (Long.MaxValue - j)).toMap,
-          Row((i - 0.25).toFloat, Seq(true, false, null)))
-      }
-    sqlContext.createDataFrame(rdd, schema).registerTempTable("InMemoryCache_different_data_types")
-    // Cache the table.
-    sql("cache table InMemoryCache_different_data_types")
-    // Make sure the table is indeed cached.
-    sqlContext.table("InMemoryCache_different_data_types").queryExecution.executedPlan
-    assert(
-      sqlContext.isCached("InMemoryCache_different_data_types"),
-      "InMemoryCache_different_data_types should be cached.")
-    // Issue a query and check the results.
-    checkAnswer(
-      sql(s"SELECT DISTINCT ${allColumns} FROM InMemoryCache_different_data_types"),
-      sqlContext.table("InMemoryCache_different_data_types").collect())
-    sqlContext.dropTempTable("InMemoryCache_different_data_types")
-  }
-
-  test("SPARK-10422: String column in InMemoryColumnarCache needs to override clone method") {
-    val df = sqlContext.range(1, 100).selectExpr("id % 10 as id")
-      .rdd.map(id => Tuple1(s"str_$id")).toDF("i")
-    val cached = df.cache()
-    // count triggers the caching action. It should not throw.
-    cached.count()
-
-    // Make sure, the DataFrame is indeed cached.
-    assert(sqlContext.cacheManager.lookupCachedData(cached).nonEmpty)
-
-    // Check result.
-    checkAnswer(
-      cached,
-      sqlContext.range(1, 100).selectExpr("id % 10 as id")
-        .rdd.map(id => Tuple1(s"str_$id")).toDF("i")
-    )
-
-    // Drop the cache.
-    cached.unpersist()
-  }
-
-  test("SPARK-10859: Predicates pushed to InMemoryColumnarTableScan are not evaluated correctly") {
-    val data = sqlContext.range(10).selectExpr("id", "cast(id as string) as s")
-    data.cache()
-    assert(data.count() === 10)
-    assert(data.filter($"s" === "3").count() === 1)
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/014c0f7a/sql/core/src/test/scala/org/apache/spark/sql/columnar/NullableColumnAccessorSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/columnar/NullableColumnAccessorSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/columnar/NullableColumnAccessorSuite.scala
deleted file mode 100644
index aa1605f..0000000
--- a/sql/core/src/test/scala/org/apache/spark/sql/columnar/NullableColumnAccessorSuite.scala
+++ /dev/null
@@ -1,92 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.columnar
-
-import java.nio.ByteBuffer
-
-import org.apache.spark.SparkFunSuite
-import org.apache.spark.sql.catalyst.CatalystTypeConverters
-import org.apache.spark.sql.catalyst.expressions.{UnsafeProjection, GenericMutableRow}
-import org.apache.spark.sql.types._
-
-class TestNullableColumnAccessor[JvmType](
-    buffer: ByteBuffer,
-    columnType: ColumnType[JvmType])
-  extends BasicColumnAccessor(buffer, columnType)
-  with NullableColumnAccessor
-
-object TestNullableColumnAccessor {
-  def apply[JvmType](buffer: ByteBuffer, columnType: ColumnType[JvmType])
-    : TestNullableColumnAccessor[JvmType] = {
-    new TestNullableColumnAccessor(buffer, columnType)
-  }
-}
-
-class NullableColumnAccessorSuite extends SparkFunSuite {
-  import org.apache.spark.sql.columnar.ColumnarTestUtils._
-
-  Seq(
-    NULL, BOOLEAN, BYTE, SHORT, INT, LONG, FLOAT, DOUBLE,
-    STRING, BINARY, COMPACT_DECIMAL(15, 10), LARGE_DECIMAL(20, 10),
-    STRUCT(StructType(StructField("a", StringType) :: Nil)),
-    ARRAY(ArrayType(IntegerType)), MAP(MapType(IntegerType, StringType)))
-    .foreach {
-    testNullableColumnAccessor(_)
-  }
-
-  def testNullableColumnAccessor[JvmType](
-      columnType: ColumnType[JvmType]): Unit = {
-
-    val typeName = columnType.getClass.getSimpleName.stripSuffix("$")
-    val nullRow = makeNullRow(1)
-
-    test(s"Nullable $typeName column accessor: empty column") {
-      val builder = TestNullableColumnBuilder(columnType)
-      val accessor = TestNullableColumnAccessor(builder.build(), columnType)
-      assert(!accessor.hasNext)
-    }
-
-    test(s"Nullable $typeName column accessor: access null values") {
-      val builder = TestNullableColumnBuilder(columnType)
-      val randomRow = makeRandomRow(columnType)
-      val proj = UnsafeProjection.create(Array[DataType](columnType.dataType))
-
-      (0 until 4).foreach { _ =>
-        builder.appendFrom(proj(randomRow), 0)
-        builder.appendFrom(proj(nullRow), 0)
-      }
-
-      val accessor = TestNullableColumnAccessor(builder.build(), columnType)
-      val row = new GenericMutableRow(1)
-      val converter = CatalystTypeConverters.createToScalaConverter(columnType.dataType)
-
-      (0 until 4).foreach { _ =>
-        assert(accessor.hasNext)
-        accessor.extractTo(row, 0)
-        assert(converter(row.get(0, columnType.dataType))
-          === converter(randomRow.get(0, columnType.dataType)))
-
-        assert(accessor.hasNext)
-        accessor.extractTo(row, 0)
-        assert(row.isNullAt(0))
-      }
-
-      assert(!accessor.hasNext)
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/014c0f7a/sql/core/src/test/scala/org/apache/spark/sql/columnar/NullableColumnBuilderSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/columnar/NullableColumnBuilderSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/columnar/NullableColumnBuilderSuite.scala
deleted file mode 100644
index 9140457..0000000
--- a/sql/core/src/test/scala/org/apache/spark/sql/columnar/NullableColumnBuilderSuite.scala
+++ /dev/null
@@ -1,107 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.columnar
-
-import org.apache.spark.SparkFunSuite
-import org.apache.spark.sql.catalyst.CatalystTypeConverters
-import org.apache.spark.sql.catalyst.expressions.{UnsafeProjection, GenericMutableRow}
-import org.apache.spark.sql.types._
-
-class TestNullableColumnBuilder[JvmType](columnType: ColumnType[JvmType])
-  extends BasicColumnBuilder[JvmType](new NoopColumnStats, columnType)
-  with NullableColumnBuilder
-
-object TestNullableColumnBuilder {
-  def apply[JvmType](columnType: ColumnType[JvmType], initialSize: Int = 0)
-    : TestNullableColumnBuilder[JvmType] = {
-    val builder = new TestNullableColumnBuilder(columnType)
-    builder.initialize(initialSize)
-    builder
-  }
-}
-
-class NullableColumnBuilderSuite extends SparkFunSuite {
-  import org.apache.spark.sql.columnar.ColumnarTestUtils._
-
-  Seq(
-    BOOLEAN, BYTE, SHORT, INT, LONG, FLOAT, DOUBLE,
-    STRING, BINARY, COMPACT_DECIMAL(15, 10), LARGE_DECIMAL(20, 10),
-    STRUCT(StructType(StructField("a", StringType) :: Nil)),
-    ARRAY(ArrayType(IntegerType)), MAP(MapType(IntegerType, StringType)))
-    .foreach {
-    testNullableColumnBuilder(_)
-  }
-
-  def testNullableColumnBuilder[JvmType](
-      columnType: ColumnType[JvmType]): Unit = {
-
-    val typeName = columnType.getClass.getSimpleName.stripSuffix("$")
-    val dataType = columnType.dataType
-    val proj = UnsafeProjection.create(Array[DataType](dataType))
-    val converter = CatalystTypeConverters.createToScalaConverter(dataType)
-
-    test(s"$typeName column builder: empty column") {
-      val columnBuilder = TestNullableColumnBuilder(columnType)
-      val buffer = columnBuilder.build()
-
-      assertResult(0, "Wrong null count")(buffer.getInt())
-      assert(!buffer.hasRemaining)
-    }
-
-    test(s"$typeName column builder: buffer size auto growth") {
-      val columnBuilder = TestNullableColumnBuilder(columnType)
-      val randomRow = makeRandomRow(columnType)
-
-      (0 until 4).foreach { _ =>
-        columnBuilder.appendFrom(proj(randomRow), 0)
-      }
-
-      val buffer = columnBuilder.build()
-
-      assertResult(0, "Wrong null count")(buffer.getInt())
-    }
-
-    test(s"$typeName column builder: null values") {
-      val columnBuilder = TestNullableColumnBuilder(columnType)
-      val randomRow = makeRandomRow(columnType)
-      val nullRow = makeNullRow(1)
-
-      (0 until 4).foreach { _ =>
-        columnBuilder.appendFrom(proj(randomRow), 0)
-        columnBuilder.appendFrom(proj(nullRow), 0)
-      }
-
-      val buffer = columnBuilder.build()
-
-      assertResult(4, "Wrong null count")(buffer.getInt())
-
-      // For null positions
-      (1 to 7 by 2).foreach(assertResult(_, "Wrong null position")(buffer.getInt()))
-
-      // For non-null values
-      val actual = new GenericMutableRow(new Array[Any](1))
-      (0 until 4).foreach { _ =>
-        columnType.extract(buffer, actual, 0)
-        assert(converter(actual.get(0, dataType)) === converter(randomRow.get(0, dataType)),
-          "Extracted value didn't equal to the original one")
-      }
-
-      assert(!buffer.hasRemaining)
-    }
-  }
-}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message