spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From lix...@apache.org
Subject spark git commit: [SPARK-23195][SQL] Keep the Hint of Cached Data
Date Wed, 24 Jan 2018 00:17:34 GMT
Repository: spark
Updated Branches:
  refs/heads/branch-2.3 851c30386 -> a23f6b13e


[SPARK-23195][SQL] Keep the Hint of Cached Data

## What changes were proposed in this pull request?
The broadcast hint of the cached plan is lost if we cache the plan. This PR is to correct
it.

```Scala
  val df1 = spark.createDataFrame(Seq((1, "4"), (2, "2"))).toDF("key", "value")
  val df2 = spark.createDataFrame(Seq((1, "1"), (2, "2"))).toDF("key", "value")
  broadcast(df2).cache()
  df2.collect()
  val df3 = df1.join(df2, Seq("key"), "inner")
```

## How was this patch tested?
Added a test.

Author: gatorsmile <gatorsmile@gmail.com>

Closes #20368 from gatorsmile/cachedBroadcastHint.

(cherry picked from commit 44cc4daf3a03f1a220eef8ce3c86867745db9ab7)
Signed-off-by: gatorsmile <gatorsmile@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/a23f6b13
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/a23f6b13
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/a23f6b13

Branch: refs/heads/branch-2.3
Commit: a23f6b13e8a4f0471ee33879a14746786bbf0435
Parents: 851c303
Author: gatorsmile <gatorsmile@gmail.com>
Authored: Tue Jan 23 16:17:09 2018 -0800
Committer: gatorsmile <gatorsmile@gmail.com>
Committed: Tue Jan 23 16:17:27 2018 -0800

----------------------------------------------------------------------
 .../sql/execution/columnar/InMemoryRelation.scala   |  4 ++--
 .../sql/execution/joins/BroadcastJoinSuite.scala    | 16 ++++++++++++++++
 2 files changed, 18 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/a23f6b13/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala
index 51928d9..5945808 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala
@@ -63,7 +63,7 @@ case class InMemoryRelation(
     tableName: Option[String])(
     @transient var _cachedColumnBuffers: RDD[CachedBatch] = null,
     val batchStats: LongAccumulator = child.sqlContext.sparkContext.longAccumulator,
-    statsOfPlanToCache: Statistics = null)
+    statsOfPlanToCache: Statistics)
   extends logical.LeafNode with MultiInstanceRelation {
 
   override protected def innerChildren: Seq[SparkPlan] = Seq(child)
@@ -77,7 +77,7 @@ case class InMemoryRelation(
       // Underlying columnar RDD hasn't been materialized, use the stats from the plan to
cache
       statsOfPlanToCache
     } else {
-      Statistics(sizeInBytes = batchStats.value.longValue)
+      Statistics(sizeInBytes = batchStats.value.longValue, hints = statsOfPlanToCache.hints)
     }
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/a23f6b13/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/BroadcastJoinSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/BroadcastJoinSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/BroadcastJoinSuite.scala
index 1704bc8..889cab0 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/BroadcastJoinSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/BroadcastJoinSuite.scala
@@ -139,6 +139,22 @@ class BroadcastJoinSuite extends QueryTest with SQLTestUtils {
     }
   }
 
+  test("broadcast hint is retained in a cached plan") {
+    Seq(true, false).foreach { materialized =>
+      withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1") {
+        val df1 = spark.createDataFrame(Seq((1, "4"), (2, "2"))).toDF("key", "value")
+        val df2 = spark.createDataFrame(Seq((1, "1"), (2, "2"))).toDF("key", "value")
+        broadcast(df2).cache()
+        if (materialized) df2.collect()
+        val df3 = df1.join(df2, Seq("key"), "inner")
+        val numBroadCastHashJoin = df3.queryExecution.executedPlan.collect {
+          case b: BroadcastHashJoinExec => b
+        }.size
+        assert(numBroadCastHashJoin === 1)
+      }
+    }
+  }
+
   private def assertBroadcastJoin(df : Dataset[Row]) : Unit = {
     val df1 = spark.createDataFrame(Seq((1, "4"), (2, "2"))).toDF("key", "value")
     val joined = df1.join(df, Seq("key"), "inner")


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message