spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From r...@apache.org
Subject git commit: [SPARK-2405][SQL] Reusue same byte buffers when creating new instance of InMemoryRelation
Date Sat, 12 Jul 2014 19:13:47 GMT
Repository: spark
Updated Branches:
  refs/heads/branch-1.0 37e49433a -> effa69f9c


[SPARK-2405][SQL] Reusue same byte buffers when creating new instance of InMemoryRelation

Reuse byte buffers when creating unique attributes for multiple instances of an InMemoryRelation
in a single query plan.

Author: Michael Armbrust <michael@databricks.com>

Closes #1332 from marmbrus/doubleCache and squashes the following commits:

4a19609 [Michael Armbrust] Clean up concurrency story by calculating buffersn the constructor.
b39c931 [Michael Armbrust] Allocations are kind of a side effect.
f67eff7 [Michael Armbrust] Reusue same byte buffers when creating new instance of InMemoryRelation

(cherry picked from commit 1a7d7cc85fb24de21f1cde67d04467171b82e845)
Signed-off-by: Reynold Xin <rxin@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/effa69f9
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/effa69f9
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/effa69f9

Branch: refs/heads/branch-1.0
Commit: effa69f9cd6abaafee2c437a3f86cec867855ede
Parents: 37e4943
Author: Michael Armbrust <michael@databricks.com>
Authored: Sat Jul 12 12:13:32 2014 -0700
Committer: Reynold Xin <rxin@apache.org>
Committed: Sat Jul 12 12:13:44 2014 -0700

----------------------------------------------------------------------
 .../analysis/MultiInstanceRelation.scala        |  2 +-
 .../columnar/InMemoryColumnarTableScan.scala    | 35 ++++++++++++++------
 2 files changed, 25 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/effa69f9/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/MultiInstanceRelation.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/MultiInstanceRelation.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/MultiInstanceRelation.scala
index a6ce908..22941ed 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/MultiInstanceRelation.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/MultiInstanceRelation.scala
@@ -30,7 +30,7 @@ import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
  * of itself with globally unique expression ids.
  */
 trait MultiInstanceRelation {
-  def newInstance: this.type
+  def newInstance(): this.type
 }
 
 /**

http://git-wip-us.apache.org/repos/asf/spark/blob/effa69f9/sql/core/src/main/scala/org/apache/spark/sql/columnar/InMemoryColumnarTableScan.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/columnar/InMemoryColumnarTableScan.scala
b/sql/core/src/main/scala/org/apache/spark/sql/columnar/InMemoryColumnarTableScan.scala
index e1e4f24..ff7f664 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/columnar/InMemoryColumnarTableScan.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/columnar/InMemoryColumnarTableScan.scala
@@ -17,6 +17,9 @@
 
 package org.apache.spark.sql.columnar
 
+import java.nio.ByteBuffer
+
+import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation
 import org.apache.spark.sql.catalyst.expressions.{GenericMutableRow, Attribute}
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
@@ -26,22 +29,19 @@ import org.apache.spark.SparkConf
 
 object InMemoryRelation {
   def apply(useCompression: Boolean, child: SparkPlan): InMemoryRelation =
-    new InMemoryRelation(child.output, useCompression, child)
+    new InMemoryRelation(child.output, useCompression, child)()
 }
 
 private[sql] case class InMemoryRelation(
     output: Seq[Attribute],
     useCompression: Boolean,
     child: SparkPlan)
+    (private var _cachedColumnBuffers: RDD[Array[ByteBuffer]] = null)
   extends LogicalPlan with MultiInstanceRelation {
 
-  override def children = Seq.empty
-  override def references = Set.empty
-
-  override def newInstance() =
-    new InMemoryRelation(output.map(_.newInstance), useCompression, child).asInstanceOf[this.type]
-
-  lazy val cachedColumnBuffers = {
+  // If the cached column buffers were not passed in, we calculate them in the constructor.
+  // As in Spark, the actual work of caching is lazy.
+  if (_cachedColumnBuffers == null) {
     val output = child.output
     val cached = child.execute().mapPartitions { iterator =>
       val columnBuilders = output.map { attribute =>
@@ -62,10 +62,23 @@ private[sql] case class InMemoryRelation(
     }.cache()
 
     cached.setName(child.toString)
-    // Force the materialization of the cached RDD.
-    cached.count()
-    cached
+    _cachedColumnBuffers = cached
   }
+
+
+  override def children = Seq.empty
+
+  override def references = Set.empty
+
+  override def newInstance() = {
+    new InMemoryRelation(
+      output.map(_.newInstance),
+      useCompression,
+      child)(
+      _cachedColumnBuffers).asInstanceOf[this.type]
+  }
+
+  def cachedColumnBuffers = _cachedColumnBuffers
 }
 
 private[sql] case class InMemoryColumnarTableScan(


Mime
View raw message