spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From marmb...@apache.org
Subject git commit: [SPARK-2650][SQL] Build column buffers in smaller batches
Date Tue, 12 Aug 2014 03:22:02 GMT
Repository: spark
Updated Branches:
  refs/heads/master c686b7dd4 -> bad21ed08


[SPARK-2650][SQL] Build column buffers in smaller batches

Author: Michael Armbrust <michael@databricks.com>

Closes #1880 from marmbrus/columnBatches and squashes the following commits:

0649987 [Michael Armbrust] add test
4756fad [Michael Armbrust] fix compilation
2314532 [Michael Armbrust] Build column buffers in smaller batches


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/bad21ed0
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/bad21ed0
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/bad21ed0

Branch: refs/heads/master
Commit: bad21ed085a505559dccc06223b486170371ddd2
Parents: c686b7d
Author: Michael Armbrust <michael@databricks.com>
Authored: Mon Aug 11 20:21:56 2014 -0700
Committer: Michael Armbrust <michael@databricks.com>
Committed: Mon Aug 11 20:21:56 2014 -0700

----------------------------------------------------------------------
 .../scala/org/apache/spark/sql/SQLConf.scala    |  4 ++
 .../scala/org/apache/spark/sql/SQLContext.scala |  4 +-
 .../columnar/InMemoryColumnarTableScan.scala    | 76 ++++++++++++--------
 .../org/apache/spark/sql/CachedTableSuite.scala | 12 +++-
 .../columnar/InMemoryColumnarQuerySuite.scala   |  6 +-
 .../spark/sql/hive/HiveMetastoreCatalog.scala   |  2 +-
 .../apache/spark/sql/hive/HiveStrategies.scala  |  2 +-
 7 files changed, 70 insertions(+), 36 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/bad21ed0/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
index 0fd7aaa..35c51de 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
@@ -25,6 +25,7 @@ import java.util.Properties
 
 private[spark] object SQLConf {
   val COMPRESS_CACHED = "spark.sql.inMemoryColumnarStorage.compressed"
+  val COLUMN_BATCH_SIZE = "spark.sql.inMemoryColumnarStorage.batchSize"
   val AUTO_BROADCASTJOIN_THRESHOLD = "spark.sql.autoBroadcastJoinThreshold"
   val DEFAULT_SIZE_IN_BYTES = "spark.sql.defaultSizeInBytes"
   val SHUFFLE_PARTITIONS = "spark.sql.shuffle.partitions"
@@ -71,6 +72,9 @@ trait SQLConf {
   /** When true tables cached using the in-memory columnar caching will be compressed. */
   private[spark] def useCompression: Boolean = getConf(COMPRESS_CACHED, "false").toBoolean
 
+  /** The number of rows that will be  */
+  private[spark] def columnBatchSize: Int = getConf(COLUMN_BATCH_SIZE, "1000").toInt
+
   /** Number of partitions to use for shuffle operators. */
   private[spark] def numShufflePartitions: Int = getConf(SHUFFLE_PARTITIONS, "200").toInt
 

http://git-wip-us.apache.org/repos/asf/spark/blob/bad21ed0/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
index 71d338d..af9f7c6 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
@@ -273,7 +273,7 @@ class SQLContext(@transient val sparkContext: SparkContext)
         currentTable.logicalPlan
 
       case _ =>
-        InMemoryRelation(useCompression, executePlan(currentTable).executedPlan)
+        InMemoryRelation(useCompression, columnBatchSize, executePlan(currentTable).executedPlan)
     }
 
     catalog.registerTable(None, tableName, asInMemoryRelation)
@@ -284,7 +284,7 @@ class SQLContext(@transient val sparkContext: SparkContext)
     table(tableName).queryExecution.analyzed match {
       // This is kind of a hack to make sure that if this was just an RDD registered as a
table,
       // we reregister the RDD as a table.
-      case inMem @ InMemoryRelation(_, _, e: ExistingRdd) =>
+      case inMem @ InMemoryRelation(_, _, _, e: ExistingRdd) =>
         inMem.cachedColumnBuffers.unpersist()
         catalog.unregisterTable(None, tableName)
         catalog.registerTable(None, tableName, SparkLogicalPlan(e)(self))

http://git-wip-us.apache.org/repos/asf/spark/blob/bad21ed0/sql/core/src/main/scala/org/apache/spark/sql/columnar/InMemoryColumnarTableScan.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/columnar/InMemoryColumnarTableScan.scala
b/sql/core/src/main/scala/org/apache/spark/sql/columnar/InMemoryColumnarTableScan.scala
index 88901de..3364d0e 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/columnar/InMemoryColumnarTableScan.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/columnar/InMemoryColumnarTableScan.scala
@@ -28,13 +28,14 @@ import org.apache.spark.sql.Row
 import org.apache.spark.SparkConf
 
 object InMemoryRelation {
-  def apply(useCompression: Boolean, child: SparkPlan): InMemoryRelation =
-    new InMemoryRelation(child.output, useCompression, child)()
+  def apply(useCompression: Boolean, batchSize: Int, child: SparkPlan): InMemoryRelation
=
+    new InMemoryRelation(child.output, useCompression, batchSize, child)()
 }
 
 private[sql] case class InMemoryRelation(
     output: Seq[Attribute],
     useCompression: Boolean,
+    batchSize: Int,
     child: SparkPlan)
     (private var _cachedColumnBuffers: RDD[Array[ByteBuffer]] = null)
   extends LogicalPlan with MultiInstanceRelation {
@@ -43,22 +44,31 @@ private[sql] case class InMemoryRelation(
   // As in Spark, the actual work of caching is lazy.
   if (_cachedColumnBuffers == null) {
     val output = child.output
-    val cached = child.execute().mapPartitions { iterator =>
-      val columnBuilders = output.map { attribute =>
-        ColumnBuilder(ColumnType(attribute.dataType).typeId, 0, attribute.name, useCompression)
-      }.toArray
-
-      var row: Row = null
-      while (iterator.hasNext) {
-        row = iterator.next()
-        var i = 0
-        while (i < row.length) {
-          columnBuilders(i).appendFrom(row, i)
-          i += 1
+    val cached = child.execute().mapPartitions { baseIterator =>
+      new Iterator[Array[ByteBuffer]] {
+        def next() = {
+          val columnBuilders = output.map { attribute =>
+            ColumnBuilder(ColumnType(attribute.dataType).typeId, 0, attribute.name, useCompression)
+          }.toArray
+
+          var row: Row = null
+          var rowCount = 0
+
+          while (baseIterator.hasNext && rowCount < batchSize) {
+            row = baseIterator.next()
+            var i = 0
+            while (i < row.length) {
+              columnBuilders(i).appendFrom(row, i)
+              i += 1
+            }
+            rowCount += 1
+          }
+
+          columnBuilders.map(_.build())
         }
-      }
 
-      Iterator.single(columnBuilders.map(_.build()))
+        def hasNext = baseIterator.hasNext
+      }
     }.cache()
 
     cached.setName(child.toString)
@@ -74,6 +84,7 @@ private[sql] case class InMemoryRelation(
     new InMemoryRelation(
       output.map(_.newInstance),
       useCompression,
+      batchSize,
       child)(
       _cachedColumnBuffers).asInstanceOf[this.type]
   }
@@ -90,22 +101,31 @@ private[sql] case class InMemoryColumnarTableScan(
 
   override def execute() = {
     relation.cachedColumnBuffers.mapPartitions { iterator =>
-      val columnBuffers = iterator.next()
-      assert(!iterator.hasNext)
+      // Find the ordinals of the requested columns.  If none are requested, use the first.
+      val requestedColumns =
+        if (attributes.isEmpty) {
+          Seq(0)
+        } else {
+          attributes.map(a => relation.output.indexWhere(_.exprId == a.exprId))
+        }
 
       new Iterator[Row] {
-        // Find the ordinals of the requested columns.  If none are requested, use the first.
-        val requestedColumns =
-          if (attributes.isEmpty) {
-            Seq(0)
-          } else {
-            attributes.map(a => relation.output.indexWhere(_.exprId == a.exprId))
-          }
+        private[this] var columnBuffers: Array[ByteBuffer] = null
+        private[this] var columnAccessors: Seq[ColumnAccessor] = null
+        nextBatch()
+
+        private[this] val nextRow = new GenericMutableRow(columnAccessors.length)
 
-        val columnAccessors = requestedColumns.map(columnBuffers(_)).map(ColumnAccessor(_))
-        val nextRow = new GenericMutableRow(columnAccessors.length)
+        def nextBatch() = {
+          columnBuffers = iterator.next()
+          columnAccessors = requestedColumns.map(columnBuffers(_)).map(ColumnAccessor(_))
+        }
 
         override def next() = {
+          if (!columnAccessors.head.hasNext) {
+            nextBatch()
+          }
+
           var i = 0
           while (i < nextRow.length) {
             columnAccessors(i).extractTo(nextRow, i)
@@ -114,7 +134,7 @@ private[sql] case class InMemoryColumnarTableScan(
           nextRow
         }
 
-        override def hasNext = columnAccessors.head.hasNext
+        override def hasNext = columnAccessors.head.hasNext || iterator.hasNext
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/bad21ed0/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala
index fbf9bd9..befef46 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala
@@ -22,9 +22,19 @@ import org.apache.spark.sql.columnar.{InMemoryRelation, InMemoryColumnarTableSca
 import org.apache.spark.sql.test.TestSQLContext
 import org.apache.spark.sql.test.TestSQLContext._
 
+case class BigData(s: String)
+
 class CachedTableSuite extends QueryTest {
   TestData // Load test tables.
 
+  test("too big for memory") {
+    val data = "*" * 10000
+    sparkContext.parallelize(1 to 1000000, 1).map(_ => BigData(data)).registerTempTable("bigData")
+    cacheTable("bigData")
+    assert(table("bigData").count() === 1000000L)
+    uncacheTable("bigData")
+  }
+
   test("SPARK-1669: cacheTable should be idempotent") {
     assume(!table("testData").logicalPlan.isInstanceOf[InMemoryRelation])
 
@@ -37,7 +47,7 @@ class CachedTableSuite extends QueryTest {
 
     cacheTable("testData")
     table("testData").queryExecution.analyzed match {
-      case InMemoryRelation(_, _, _: InMemoryColumnarTableScan) =>
+      case InMemoryRelation(_, _, _, _: InMemoryColumnarTableScan) =>
         fail("cacheTable is not idempotent")
 
       case _ =>

http://git-wip-us.apache.org/repos/asf/spark/blob/bad21ed0/sql/core/src/test/scala/org/apache/spark/sql/columnar/InMemoryColumnarQuerySuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/columnar/InMemoryColumnarQuerySuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/columnar/InMemoryColumnarQuerySuite.scala
index b561b44..736c0f8 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/columnar/InMemoryColumnarQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/columnar/InMemoryColumnarQuerySuite.scala
@@ -28,14 +28,14 @@ class InMemoryColumnarQuerySuite extends QueryTest {
 
   test("simple columnar query") {
     val plan = TestSQLContext.executePlan(testData.logicalPlan).executedPlan
-    val scan = InMemoryRelation(useCompression = true, plan)
+    val scan = InMemoryRelation(useCompression = true, 5, plan)
 
     checkAnswer(scan, testData.collect().toSeq)
   }
 
   test("projection") {
     val plan = TestSQLContext.executePlan(testData.select('value, 'key).logicalPlan).executedPlan
-    val scan = InMemoryRelation(useCompression = true, plan)
+    val scan = InMemoryRelation(useCompression = true, 5, plan)
 
     checkAnswer(scan, testData.collect().map {
       case Row(key: Int, value: String) => value -> key
@@ -44,7 +44,7 @@ class InMemoryColumnarQuerySuite extends QueryTest {
 
   test("SPARK-1436 regression: in-memory columns must be able to be accessed multiple times")
{
     val plan = TestSQLContext.executePlan(testData.logicalPlan).executedPlan
-    val scan = InMemoryRelation(useCompression = true, plan)
+    val scan = InMemoryRelation(useCompression = true, 5, plan)
 
     checkAnswer(scan, testData.collect().toSeq)
     checkAnswer(scan, testData.collect().toSeq)

http://git-wip-us.apache.org/repos/asf/spark/blob/bad21ed0/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
index 82e9c1a..3b37121 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
@@ -137,7 +137,7 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog
with
         castChildOutput(p, table, child)
 
       case p @ logical.InsertIntoTable(
-                 InMemoryRelation(_, _,
+                 InMemoryRelation(_, _, _,
                    HiveTableScan(_, table, _)), _, child, _) =>
         castChildOutput(p, table, child)
     }

http://git-wip-us.apache.org/repos/asf/spark/blob/bad21ed0/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
index 85d2496..5fcc1bd 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
@@ -45,7 +45,7 @@ private[hive] trait HiveStrategies {
       case logical.InsertIntoTable(table: MetastoreRelation, partition, child, overwrite)
=>
         InsertIntoHiveTable(table, partition, planLater(child), overwrite)(hiveContext) ::
Nil
       case logical.InsertIntoTable(
-             InMemoryRelation(_, _,
+             InMemoryRelation(_, _, _,
                HiveTableScan(_, table, _)), partition, child, overwrite) =>
         InsertIntoHiveTable(table, partition, planLater(child), overwrite)(hiveContext) ::
Nil
       case _ => Nil


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message