From commits-return-29910-archive-asf-public=cust-asf.ponee.io@spark.apache.org Wed Jan 24 01:17:36 2018 Return-Path: X-Original-To: archive-asf-public@eu.ponee.io Delivered-To: archive-asf-public@eu.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by mx-eu-01.ponee.io (Postfix) with ESMTP id 26D05180621 for ; Wed, 24 Jan 2018 01:17:36 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id 1619F160C4D; Wed, 24 Jan 2018 00:17:36 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 5B93A160C3A for ; Wed, 24 Jan 2018 01:17:35 +0100 (CET) Received: (qmail 95027 invoked by uid 500); 24 Jan 2018 00:17:34 -0000 Mailing-List: contact commits-help@spark.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Delivered-To: mailing list commits@spark.apache.org Received: (qmail 95018 invoked by uid 99); 24 Jan 2018 00:17:34 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 24 Jan 2018 00:17:34 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 5A79DE0BAE; Wed, 24 Jan 2018 00:17:34 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: lixiao@apache.org To: commits@spark.apache.org Message-Id: <6cbb79ab587c48b785b03a01d3d45a4e@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: spark git commit: [SPARK-23195][SQL] Keep the Hint of Cached Data Date: Wed, 24 Jan 2018 00:17:34 +0000 (UTC) Repository: spark Updated Branches: refs/heads/branch-2.3 851c30386 -> a23f6b13e [SPARK-23195][SQL] Keep the Hint of Cached Data ## What changes were proposed in this pull request? The broadcast hint of the cached plan is lost if we cache the plan. This PR is to correct it. ```Scala val df1 = spark.createDataFrame(Seq((1, "4"), (2, "2"))).toDF("key", "value") val df2 = spark.createDataFrame(Seq((1, "1"), (2, "2"))).toDF("key", "value") broadcast(df2).cache() df2.collect() val df3 = df1.join(df2, Seq("key"), "inner") ``` ## How was this patch tested? Added a test. Author: gatorsmile Closes #20368 from gatorsmile/cachedBroadcastHint. (cherry picked from commit 44cc4daf3a03f1a220eef8ce3c86867745db9ab7) Signed-off-by: gatorsmile Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/a23f6b13 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/a23f6b13 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/a23f6b13 Branch: refs/heads/branch-2.3 Commit: a23f6b13e8a4f0471ee33879a14746786bbf0435 Parents: 851c303 Author: gatorsmile Authored: Tue Jan 23 16:17:09 2018 -0800 Committer: gatorsmile Committed: Tue Jan 23 16:17:27 2018 -0800 ---------------------------------------------------------------------- .../sql/execution/columnar/InMemoryRelation.scala | 4 ++-- .../sql/execution/joins/BroadcastJoinSuite.scala | 16 ++++++++++++++++ 2 files changed, 18 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/a23f6b13/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala index 51928d9..5945808 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala @@ -63,7 +63,7 @@ case class InMemoryRelation( tableName: Option[String])( @transient var _cachedColumnBuffers: RDD[CachedBatch] = null, val batchStats: LongAccumulator = child.sqlContext.sparkContext.longAccumulator, - statsOfPlanToCache: Statistics = null) + statsOfPlanToCache: Statistics) extends logical.LeafNode with MultiInstanceRelation { override protected def innerChildren: Seq[SparkPlan] = Seq(child) @@ -77,7 +77,7 @@ case class InMemoryRelation( // Underlying columnar RDD hasn't been materialized, use the stats from the plan to cache statsOfPlanToCache } else { - Statistics(sizeInBytes = batchStats.value.longValue) + Statistics(sizeInBytes = batchStats.value.longValue, hints = statsOfPlanToCache.hints) } } http://git-wip-us.apache.org/repos/asf/spark/blob/a23f6b13/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/BroadcastJoinSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/BroadcastJoinSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/BroadcastJoinSuite.scala index 1704bc8..889cab0 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/BroadcastJoinSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/BroadcastJoinSuite.scala @@ -139,6 +139,22 @@ class BroadcastJoinSuite extends QueryTest with SQLTestUtils { } } + test("broadcast hint is retained in a cached plan") { + Seq(true, false).foreach { materialized => + withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1") { + val df1 = spark.createDataFrame(Seq((1, "4"), (2, "2"))).toDF("key", "value") + val df2 = spark.createDataFrame(Seq((1, "1"), (2, "2"))).toDF("key", "value") + broadcast(df2).cache() + if (materialized) df2.collect() + val df3 = df1.join(df2, Seq("key"), "inner") + val numBroadCastHashJoin = df3.queryExecution.executedPlan.collect { + case b: BroadcastHashJoinExec => b + }.size + assert(numBroadCastHashJoin === 1) + } + } + } + private def assertBroadcastJoin(df : Dataset[Row]) : Unit = { val df1 = spark.createDataFrame(Seq((1, "4"), (2, "2"))).toDF("key", "value") val joined = df1.join(df, Seq("key"), "inner") --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org For additional commands, e-mail: commits-help@spark.apache.org