spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From hvanhov...@apache.org
Subject spark git commit: [SPARK-20567] Lazily bind in GenerateExec
Date Wed, 03 May 2017 05:44:31 GMT
Repository: spark
Updated Branches:
  refs/heads/master b946f3160 -> 6235132a8


[SPARK-20567] Lazily bind in GenerateExec

It is not valid to eagerly bind with the child's output as this causes failures when we attempt
to canonicalize the plan (replacing the attribute references with dummies).

Author: Michael Armbrust <michael@databricks.com>

Closes #17838 from marmbrus/fixBindExplode.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/6235132a
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/6235132a
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/6235132a

Branch: refs/heads/master
Commit: 6235132a8ce64bb12d825d0a65e5dd052d1ee647
Parents: b946f31
Author: Michael Armbrust <michael@databricks.com>
Authored: Tue May 2 22:44:27 2017 -0700
Committer: Herman van Hovell <hvanhovell@databricks.com>
Committed: Tue May 2 22:44:27 2017 -0700

----------------------------------------------------------------------
 .../apache/spark/sql/execution/GenerateExec.scala   |  2 +-
 .../sql/streaming/StreamingAggregationSuite.scala   | 16 ++++++++++++++++
 2 files changed, 17 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/6235132a/sql/core/src/main/scala/org/apache/spark/sql/execution/GenerateExec.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/GenerateExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/GenerateExec.scala
index 1812a11..c35e563 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/GenerateExec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/GenerateExec.scala
@@ -78,7 +78,7 @@ case class GenerateExec(
 
   override def outputPartitioning: Partitioning = child.outputPartitioning
 
-  val boundGenerator: Generator = BindReferences.bindReference(generator, child.output)
+  lazy val boundGenerator: Generator = BindReferences.bindReference(generator, child.output)
 
   protected override def doExecute(): RDD[InternalRow] = {
     // boundGenerator.terminate() should be triggered after all of the rows in the partition

http://git-wip-us.apache.org/repos/asf/spark/blob/6235132a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala
index f796a4c..4345a70 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala
@@ -69,6 +69,22 @@ class StreamingAggregationSuite extends StateStoreMetricsTest with BeforeAndAfte
     )
   }
 
+  test("count distinct") {
+    val inputData = MemoryStream[(Int, Seq[Int])]
+
+    val aggregated =
+      inputData.toDF()
+        .select($"*", explode($"_2") as 'value)
+        .groupBy($"_1")
+        .agg(size(collect_set($"value")))
+        .as[(Int, Int)]
+
+    testStream(aggregated, Update)(
+      AddData(inputData, (1, Seq(1, 2))),
+      CheckLastBatch((1, 2))
+    )
+  }
+
   test("simple count, complete mode") {
     val inputData = MemoryStream[Int]
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message