spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From lix...@apache.org
Subject spark git commit: [SPARK-20867][SQL] Move hints from Statistics into HintInfo class
Date Wed, 24 May 2017 20:57:32 GMT
Repository: spark
Updated Branches:
  refs/heads/branch-2.2 c59ad420b -> b7a2a16b1


[SPARK-20867][SQL] Move hints from Statistics into HintInfo class

## What changes were proposed in this pull request?
This is a follow-up to SPARK-20857 to move the broadcast hint from Statistics into a new HintInfo
class, so we can be more flexible in adding new hints in the future.

## How was this patch tested?
Updated test cases to reflect the change.

Author: Reynold Xin <rxin@databricks.com>

Closes #18087 from rxin/SPARK-20867.

(cherry picked from commit a64746677bf09ef67e3fd538355a6ee9b5ce8cf4)
Signed-off-by: Xiao Li <gatorsmile@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/b7a2a16b
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/b7a2a16b
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/b7a2a16b

Branch: refs/heads/branch-2.2
Commit: b7a2a16b1e01375292938fc48b0a333ec4e7cd30
Parents: c59ad42
Author: Reynold Xin <rxin@databricks.com>
Authored: Wed May 24 13:57:19 2017 -0700
Committer: Xiao Li <gatorsmile@gmail.com>
Committed: Wed May 24 13:57:28 2017 -0700

----------------------------------------------------------------------
 .../sql/catalyst/analysis/ResolveHints.scala    |  6 ++---
 .../catalyst/plans/logical/LogicalPlan.scala    |  2 +-
 .../sql/catalyst/plans/logical/Statistics.scala | 11 +++------
 .../plans/logical/basicLogicalOperators.scala   | 17 +++++++-------
 .../sql/catalyst/plans/logical/hints.scala      | 24 ++++++++++++++++----
 .../statsEstimation/AggregateEstimation.scala   |  2 +-
 .../catalyst/analysis/ResolveHintsSuite.scala   | 20 ++++++++--------
 .../BasicStatsEstimationSuite.scala             | 17 +++++++-------
 .../spark/sql/execution/SparkStrategies.scala   |  2 +-
 .../scala/org/apache/spark/sql/functions.scala  |  4 ++--
 .../spark/sql/StatisticsCollectionSuite.scala   |  2 +-
 11 files changed, 59 insertions(+), 48 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/b7a2a16b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveHints.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveHints.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveHints.scala
index 9dfd84c..86c788a 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveHints.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveHints.scala
@@ -57,9 +57,9 @@ object ResolveHints {
       val newNode = CurrentOrigin.withOrigin(plan.origin) {
         plan match {
           case u: UnresolvedRelation if toBroadcast.exists(resolver(_, u.tableIdentifier.table))
=>
-            ResolvedHint(plan, isBroadcastable = Option(true))
+            ResolvedHint(plan, HintInfo(isBroadcastable = Option(true)))
           case r: SubqueryAlias if toBroadcast.exists(resolver(_, r.alias)) =>
-            ResolvedHint(plan, isBroadcastable = Option(true))
+            ResolvedHint(plan, HintInfo(isBroadcastable = Option(true)))
 
           case _: ResolvedHint | _: View | _: With | _: SubqueryAlias =>
             // Don't traverse down these nodes.
@@ -88,7 +88,7 @@ object ResolveHints {
       case h: UnresolvedHint if BROADCAST_HINT_NAMES.contains(h.name.toUpperCase(Locale.ROOT))
=>
         if (h.parameters.isEmpty) {
           // If there is no table alias specified, turn the entire subtree into a BroadcastHint.
-          ResolvedHint(h.child, isBroadcastable = Option(true))
+          ResolvedHint(h.child, HintInfo(isBroadcastable = Option(true)))
         } else {
           // Otherwise, find within the subtree query plans that should be broadcasted.
           applyBroadcastHint(h.child, h.parameters.toSet)

http://git-wip-us.apache.org/repos/asf/spark/blob/b7a2a16b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala
index 6bdcf49..2ebb2ff 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala
@@ -347,7 +347,7 @@ abstract class UnaryNode extends LogicalPlan {
     }
 
     // Don't propagate rowCount and attributeStats, since they are not estimated here.
-    Statistics(sizeInBytes = sizeInBytes, isBroadcastable = child.stats(conf).isBroadcastable)
+    Statistics(sizeInBytes = sizeInBytes, hints = child.stats(conf).hints)
   }
 }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/b7a2a16b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/Statistics.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/Statistics.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/Statistics.scala
index 81bb374..a64562b 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/Statistics.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/Statistics.scala
@@ -46,13 +46,13 @@ import org.apache.spark.util.Utils
  *                    defaults to the product of children's `sizeInBytes`.
  * @param rowCount Estimated number of rows.
  * @param attributeStats Statistics for Attributes.
- * @param isBroadcastable If true, output is small enough to be used in a broadcast join.
+ * @param hints Query hints.
  */
 case class Statistics(
     sizeInBytes: BigInt,
     rowCount: Option[BigInt] = None,
     attributeStats: AttributeMap[ColumnStat] = AttributeMap(Nil),
-    isBroadcastable: Boolean = false) {
+    hints: HintInfo = HintInfo()) {
 
   override def toString: String = "Statistics(" + simpleString + ")"
 
@@ -65,14 +65,9 @@ case class Statistics(
       } else {
         ""
       },
-      s"isBroadcastable=$isBroadcastable"
+      s"hints=$hints"
     ).filter(_.nonEmpty).mkString(", ")
   }
-
-  /** Must be called when computing stats for a join operator to reset hints. */
-  def resetHintsForJoin(): Statistics = copy(
-    isBroadcastable = false
-  )
 }
 
 

http://git-wip-us.apache.org/repos/asf/spark/blob/b7a2a16b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
index d7f56f6..2eee943 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
@@ -195,9 +195,9 @@ case class Intersect(left: LogicalPlan, right: LogicalPlan) extends SetOperation
     val leftSize = left.stats(conf).sizeInBytes
     val rightSize = right.stats(conf).sizeInBytes
     val sizeInBytes = if (leftSize < rightSize) leftSize else rightSize
-    val isBroadcastable = left.stats(conf).isBroadcastable || right.stats(conf).isBroadcastable
-
-    Statistics(sizeInBytes = sizeInBytes, isBroadcastable = isBroadcastable)
+    Statistics(
+      sizeInBytes = sizeInBytes,
+      hints = left.stats(conf).hints.resetForJoin())
   }
 }
 
@@ -364,7 +364,8 @@ case class Join(
       case _ =>
         // Make sure we don't propagate isBroadcastable in other joins, because
         // they could explode the size.
-        super.computeStats(conf).resetHintsForJoin()
+        val stats = super.computeStats(conf)
+        stats.copy(hints = stats.hints.resetForJoin())
     }
 
     if (conf.cboEnabled) {
@@ -560,7 +561,7 @@ case class Aggregate(
         Statistics(
           sizeInBytes = EstimationUtils.getOutputSize(output, outputRowCount = 1),
           rowCount = Some(1),
-          isBroadcastable = child.stats(conf).isBroadcastable)
+          hints = child.stats(conf).hints)
       } else {
         super.computeStats(conf)
       }
@@ -749,7 +750,7 @@ case class GlobalLimit(limitExpr: Expression, child: LogicalPlan) extends
UnaryN
     Statistics(
       sizeInBytes = EstimationUtils.getOutputSize(output, rowCount, childStats.attributeStats),
       rowCount = Some(rowCount),
-      isBroadcastable = childStats.isBroadcastable)
+      hints = childStats.hints)
   }
 }
 
@@ -770,7 +771,7 @@ case class LocalLimit(limitExpr: Expression, child: LogicalPlan) extends
UnaryNo
       Statistics(
         sizeInBytes = 1,
         rowCount = Some(0),
-        isBroadcastable = childStats.isBroadcastable)
+        hints = childStats.hints)
     } else {
       // The output row count of LocalLimit should be the sum of row counts from each partition.
       // However, since the number of partitions is not available here, we just use statistics
of
@@ -821,7 +822,7 @@ case class Sample(
     }
     val sampledRowCount = childStats.rowCount.map(c => EstimationUtils.ceil(BigDecimal(c)
* ratio))
     // Don't propagate column stats, because we don't know the distribution after a sample
operation
-    Statistics(sizeInBytes, sampledRowCount, isBroadcastable = childStats.isBroadcastable)
+    Statistics(sizeInBytes, sampledRowCount, hints = childStats.hints)
   }
 
   override protected def otherCopyArgs: Seq[AnyRef] = isTableSample :: Nil

http://git-wip-us.apache.org/repos/asf/spark/blob/b7a2a16b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/hints.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/hints.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/hints.scala
index 9bcbfbb..b96d7bc 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/hints.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/hints.scala
@@ -35,15 +35,31 @@ case class UnresolvedHint(name: String, parameters: Seq[String], child:
LogicalP
 /**
  * A resolved hint node. The analyzer should convert all [[UnresolvedHint]] into [[ResolvedHint]].
  */
-case class ResolvedHint(
-    child: LogicalPlan,
-    isBroadcastable: Option[Boolean] = None)
+case class ResolvedHint(child: LogicalPlan, hints: HintInfo = HintInfo())
   extends UnaryNode {
 
   override def output: Seq[Attribute] = child.output
 
   override def computeStats(conf: SQLConf): Statistics = {
     val stats = child.stats(conf)
-    isBroadcastable.map(x => stats.copy(isBroadcastable = x)).getOrElse(stats)
+    stats.copy(hints = hints)
+  }
+}
+
+
+case class HintInfo(
+    isBroadcastable: Option[Boolean] = None) {
+
+  /** Must be called when computing stats for a join operator to reset hints. */
+  def resetForJoin(): HintInfo = copy(
+    isBroadcastable = None
+  )
+
+  override def toString: String = {
+    if (productIterator.forall(_.asInstanceOf[Option[_]].isEmpty)) {
+      "none"
+    } else {
+      isBroadcastable.map(x => s"isBroadcastable=$x").getOrElse("")
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/b7a2a16b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/AggregateEstimation.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/AggregateEstimation.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/AggregateEstimation.scala
index 48b5fbb..a0c2319 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/AggregateEstimation.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/AggregateEstimation.scala
@@ -56,7 +56,7 @@ object AggregateEstimation {
         sizeInBytes = getOutputSize(agg.output, outputRows, outputAttrStats),
         rowCount = Some(outputRows),
         attributeStats = outputAttrStats,
-        isBroadcastable = childStats.isBroadcastable))
+        hints = childStats.hints))
     } else {
       None
     }

http://git-wip-us.apache.org/repos/asf/spark/blob/b7a2a16b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/ResolveHintsSuite.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/ResolveHintsSuite.scala
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/ResolveHintsSuite.scala
index bb914e1..3d51480 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/ResolveHintsSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/ResolveHintsSuite.scala
@@ -36,17 +36,17 @@ class ResolveHintsSuite extends AnalysisTest {
   test("case-sensitive or insensitive parameters") {
     checkAnalysis(
       UnresolvedHint("MAPJOIN", Seq("TaBlE"), table("TaBlE")),
-      ResolvedHint(testRelation, isBroadcastable = Option(true)),
+      ResolvedHint(testRelation, HintInfo(isBroadcastable = Option(true))),
       caseSensitive = false)
 
     checkAnalysis(
       UnresolvedHint("MAPJOIN", Seq("table"), table("TaBlE")),
-      ResolvedHint(testRelation, isBroadcastable = Option(true)),
+      ResolvedHint(testRelation, HintInfo(isBroadcastable = Option(true))),
       caseSensitive = false)
 
     checkAnalysis(
       UnresolvedHint("MAPJOIN", Seq("TaBlE"), table("TaBlE")),
-      ResolvedHint(testRelation, isBroadcastable = Option(true)),
+      ResolvedHint(testRelation, HintInfo(isBroadcastable = Option(true))),
       caseSensitive = true)
 
     checkAnalysis(
@@ -58,28 +58,28 @@ class ResolveHintsSuite extends AnalysisTest {
   test("multiple broadcast hint aliases") {
     checkAnalysis(
       UnresolvedHint("MAPJOIN", Seq("table", "table2"), table("table").join(table("table2"))),
-      Join(ResolvedHint(testRelation, isBroadcastable = Option(true)),
-        ResolvedHint(testRelation2, isBroadcastable = Option(true)), Inner, None),
+      Join(ResolvedHint(testRelation, HintInfo(isBroadcastable = Option(true))),
+        ResolvedHint(testRelation2, HintInfo(isBroadcastable = Option(true))), Inner, None),
       caseSensitive = false)
   }
 
   test("do not traverse past existing broadcast hints") {
     checkAnalysis(
       UnresolvedHint("MAPJOIN", Seq("table"),
-        ResolvedHint(table("table").where('a > 1), isBroadcastable = Option(true))),
-      ResolvedHint(testRelation.where('a > 1), isBroadcastable = Option(true)).analyze,
+        ResolvedHint(table("table").where('a > 1), HintInfo(isBroadcastable = Option(true)))),
+      ResolvedHint(testRelation.where('a > 1), HintInfo(isBroadcastable = Option(true))).analyze,
       caseSensitive = false)
   }
 
   test("should work for subqueries") {
     checkAnalysis(
       UnresolvedHint("MAPJOIN", Seq("tableAlias"), table("table").as("tableAlias")),
-      ResolvedHint(testRelation, isBroadcastable = Option(true)),
+      ResolvedHint(testRelation, HintInfo(isBroadcastable = Option(true))),
       caseSensitive = false)
 
     checkAnalysis(
       UnresolvedHint("MAPJOIN", Seq("tableAlias"), table("table").subquery('tableAlias)),
-      ResolvedHint(testRelation, isBroadcastable = Option(true)),
+      ResolvedHint(testRelation, HintInfo(isBroadcastable = Option(true))),
       caseSensitive = false)
 
     // Negative case: if the alias doesn't match, don't match the original table name.
@@ -104,7 +104,7 @@ class ResolveHintsSuite extends AnalysisTest {
           |SELECT /*+ BROADCAST(ctetable) */ * FROM ctetable
         """.stripMargin
       ),
-      ResolvedHint(testRelation.where('a > 1).select('a), isBroadcastable = Option(true))
+      ResolvedHint(testRelation.where('a > 1).select('a), HintInfo(isBroadcastable = Option(true)))
         .select('a).analyze,
       caseSensitive = false)
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/b7a2a16b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/statsEstimation/BasicStatsEstimationSuite.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/statsEstimation/BasicStatsEstimationSuite.scala
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/statsEstimation/BasicStatsEstimationSuite.scala
index 81b91e6..2afea6d 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/statsEstimation/BasicStatsEstimationSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/statsEstimation/BasicStatsEstimationSuite.scala
@@ -37,19 +37,20 @@ class BasicStatsEstimationSuite extends StatsEstimationTestBase {
 
   test("BroadcastHint estimation") {
     val filter = Filter(Literal(true), plan)
-    val filterStatsCboOn = Statistics(sizeInBytes = 10 * (8 +4), isBroadcastable = false,
+    val filterStatsCboOn = Statistics(sizeInBytes = 10 * (8 +4),
       rowCount = Some(10), attributeStats = AttributeMap(Seq(attribute -> colStat)))
-    val filterStatsCboOff = Statistics(sizeInBytes = 10 * (8 +4), isBroadcastable = false)
+    val filterStatsCboOff = Statistics(sizeInBytes = 10 * (8 +4))
     checkStats(
       filter,
       expectedStatsCboOn = filterStatsCboOn,
       expectedStatsCboOff = filterStatsCboOff)
 
-    val broadcastHint = ResolvedHint(filter, isBroadcastable = Option(true))
+    val broadcastHint = ResolvedHint(filter, HintInfo(isBroadcastable = Option(true)))
     checkStats(
       broadcastHint,
-      expectedStatsCboOn = filterStatsCboOn.copy(isBroadcastable = true),
-      expectedStatsCboOff = filterStatsCboOff.copy(isBroadcastable = true))
+      expectedStatsCboOn = filterStatsCboOn.copy(hints = HintInfo(isBroadcastable = Option(true))),
+      expectedStatsCboOff = filterStatsCboOff.copy(hints = HintInfo(isBroadcastable = Option(true)))
+    )
   }
 
   test("limit estimation: limit < child's rowCount") {
@@ -94,15 +95,13 @@ class BasicStatsEstimationSuite extends StatsEstimationTestBase {
         sizeInBytes = 40,
         rowCount = Some(10),
         attributeStats = AttributeMap(Seq(
-          AttributeReference("c1", IntegerType)() -> ColumnStat(10, Some(1), Some(10),
0, 4, 4))),
-        isBroadcastable = false)
+          AttributeReference("c1", IntegerType)() -> ColumnStat(10, Some(1), Some(10),
0, 4, 4))))
     val expectedCboStats =
       Statistics(
         sizeInBytes = 4,
         rowCount = Some(1),
         attributeStats = AttributeMap(Seq(
-          AttributeReference("c1", IntegerType)() -> ColumnStat(1, Some(5), Some(5), 0,
4, 4))),
-        isBroadcastable = false)
+          AttributeReference("c1", IntegerType)() -> ColumnStat(1, Some(5), Some(5), 0,
4, 4))))
 
     val plan = DummyLogicalPlan(defaultStats = expectedDefaultStats, cboStats = expectedCboStats)
     checkStats(

http://git-wip-us.apache.org/repos/asf/spark/blob/b7a2a16b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
index 5981b49..843ce63 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
@@ -114,7 +114,7 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
      * Matches a plan whose output should be small enough to be used in broadcast join.
      */
     private def canBroadcast(plan: LogicalPlan): Boolean = {
-      plan.stats(conf).isBroadcastable ||
+      plan.stats(conf).hints.isBroadcastable.getOrElse(false) ||
         (plan.stats(conf).sizeInBytes >= 0 &&
           plan.stats(conf).sizeInBytes <= conf.autoBroadcastJoinThreshold)
     }

http://git-wip-us.apache.org/repos/asf/spark/blob/b7a2a16b/sql/core/src/main/scala/org/apache/spark/sql/functions.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala b/sql/core/src/main/scala/org/apache/spark/sql/functions.scala
index d2256a0..a4f79fb 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/functions.scala
@@ -29,7 +29,7 @@ import org.apache.spark.sql.catalyst.analysis.{Star, UnresolvedFunction}
 import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.expressions.aggregate._
-import org.apache.spark.sql.catalyst.plans.logical.ResolvedHint
+import org.apache.spark.sql.catalyst.plans.logical.{HintInfo, ResolvedHint}
 import org.apache.spark.sql.execution.SparkSqlParser
 import org.apache.spark.sql.expressions.UserDefinedFunction
 import org.apache.spark.sql.internal.SQLConf
@@ -1020,7 +1020,7 @@ object functions {
    */
   def broadcast[T](df: Dataset[T]): Dataset[T] = {
     Dataset[T](df.sparkSession,
-      ResolvedHint(df.logicalPlan, isBroadcastable = Option(true)))(df.exprEnc)
+      ResolvedHint(df.logicalPlan, HintInfo(isBroadcastable = Option(true))))(df.exprEnc)
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/spark/blob/b7a2a16b/sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionSuite.scala
index ddc393c..601324f 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionSuite.scala
@@ -164,7 +164,7 @@ class StatisticsCollectionSuite extends StatisticsCollectionTestBase with
Shared
     numbers.foreach { case (input, (expectedSize, expectedRows)) =>
       val stats = Statistics(sizeInBytes = input, rowCount = Some(input))
       val expectedString = s"sizeInBytes=$expectedSize, rowCount=$expectedRows," +
-        s" isBroadcastable=${stats.isBroadcastable}"
+        s" hints=none"
       assert(stats.simpleString == expectedString)
     }
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message