spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From lix...@apache.org
Subject spark git commit: [SPARK-19471][SQL] AggregationIterator does not initialize the generated result projection before using it
Date Tue, 15 Aug 2017 15:51:21 GMT
Repository: spark
Updated Branches:
  refs/heads/master 12411b5ed -> bc9902587


[SPARK-19471][SQL] AggregationIterator does not initialize the generated result projection
before using it

## What changes were proposed in this pull request?

This is a follow-up PR that moves the test case in PR-18920 (https://github.com/apache/spark/pull/18920)
to DataFrameAggregateSuit.

## How was this patch tested?
unit test

Author: donnyzone <wellfengzhu@gmail.com>

Closes #18946 from DonnyZone/branch-19471-followingPR.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/bc990258
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/bc990258
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/bc990258

Branch: refs/heads/master
Commit: bc9902587a3a3fc6a835ec485c32c047f89100f2
Parents: 12411b5
Author: donnyzone <wellfengzhu@gmail.com>
Authored: Tue Aug 15 08:51:18 2017 -0700
Committer: gatorsmile <gatorsmile@gmail.com>
Committed: Tue Aug 15 08:51:18 2017 -0700

----------------------------------------------------------------------
 .../spark/sql/DataFrameAggregateSuite.scala     | 47 ++++++++++++++++++++
 .../spark/sql/DataFrameFunctionsSuite.scala     | 45 -------------------
 2 files changed, 47 insertions(+), 45 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/bc990258/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala
index 69ea62e..affe971 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala
@@ -17,6 +17,10 @@
 
 package org.apache.spark.sql
 
+import scala.util.Random
+
+import org.apache.spark.sql.execution.WholeStageCodegenExec
+import org.apache.spark.sql.execution.aggregate.{HashAggregateExec, ObjectHashAggregateExec,
SortAggregateExec}
 import org.apache.spark.sql.expressions.Window
 import org.apache.spark.sql.functions._
 import org.apache.spark.sql.internal.SQLConf
@@ -558,6 +562,49 @@ class DataFrameAggregateSuite extends QueryTest with SharedSQLContext
{
     assert(e.message.contains("aggregate functions are not allowed in GROUP BY"))
   }
 
+  private def assertNoExceptions(c: Column): Unit = {
+    for ((wholeStage, useObjectHashAgg) <-
+         Seq((true, true), (true, false), (false, true), (false, false))) {
+      withSQLConf(
+        (SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key, wholeStage.toString),
+        (SQLConf.USE_OBJECT_HASH_AGG.key, useObjectHashAgg.toString)) {
+
+        val df = Seq(("1", 1), ("1", 2), ("2", 3), ("2", 4)).toDF("x", "y")
+
+        // test case for HashAggregate
+        val hashAggDF = df.groupBy("x").agg(c, sum("y"))
+        val hashAggPlan = hashAggDF.queryExecution.executedPlan
+        if (wholeStage) {
+          assert(hashAggPlan.find {
+            case WholeStageCodegenExec(_: HashAggregateExec) => true
+            case _ => false
+          }.isDefined)
+        } else {
+          assert(hashAggPlan.isInstanceOf[HashAggregateExec])
+        }
+        hashAggDF.collect()
+
+        // test case for ObjectHashAggregate and SortAggregate
+        val objHashAggOrSortAggDF = df.groupBy("x").agg(c, collect_list("y"))
+        val objHashAggOrSortAggPlan = objHashAggOrSortAggDF.queryExecution.executedPlan
+        if (useObjectHashAgg) {
+          assert(objHashAggOrSortAggPlan.isInstanceOf[ObjectHashAggregateExec])
+        } else {
+          assert(objHashAggOrSortAggPlan.isInstanceOf[SortAggregateExec])
+        }
+        objHashAggOrSortAggDF.collect()
+      }
+    }
+  }
+
+  test("SPARK-19471: AggregationIterator does not initialize the generated result projection"
+
+    " before using it") {
+    Seq(
+      monotonically_increasing_id(), spark_partition_id(),
+      rand(Random.nextLong()), randn(Random.nextLong())
+    ).foreach(assertNoExceptions)
+  }
+
   test("SPARK-21580 ints in aggregation expressions are taken as group-by ordinal.") {
     checkAnswer(
       testData2.groupBy(lit(3), lit(4)).agg(lit(6), lit(7), sum("b")),

http://git-wip-us.apache.org/repos/asf/spark/blob/bc990258/sql/core/src/test/scala/org/apache/spark/sql/DataFrameFunctionsSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameFunctionsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameFunctionsSuite.scala
index fdb9f1d..0681b9c 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameFunctionsSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameFunctionsSuite.scala
@@ -24,8 +24,6 @@ import scala.util.Random
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions.Expression
 import org.apache.spark.sql.catalyst.expressions.codegen.CodegenFallback
-import org.apache.spark.sql.execution.WholeStageCodegenExec
-import org.apache.spark.sql.execution.aggregate.{HashAggregateExec, ObjectHashAggregateExec,
SortAggregateExec}
 import org.apache.spark.sql.functions._
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.test.SharedSQLContext
@@ -451,49 +449,6 @@ class DataFrameFunctionsSuite extends QueryTest with SharedSQLContext
{
     ).foreach(assertValuesDoNotChangeAfterCoalesceOrUnion(_))
   }
 
-  private def assertNoExceptions(c: Column): Unit = {
-    for ((wholeStage, useObjectHashAgg) <-
-         Seq((true, true), (true, false), (false, true), (false, false))) {
-      withSQLConf(
-        (SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key, wholeStage.toString),
-        (SQLConf.USE_OBJECT_HASH_AGG.key, useObjectHashAgg.toString)) {
-
-        val df = Seq(("1", 1), ("1", 2), ("2", 3), ("2", 4)).toDF("x", "y")
-
-        // HashAggregate test case
-        val hashAggDF = df.groupBy("x").agg(c, sum("y"))
-        val hashAggPlan = hashAggDF.queryExecution.executedPlan
-        if (wholeStage) {
-          assert(hashAggPlan.find {
-            case WholeStageCodegenExec(_: HashAggregateExec) => true
-            case _ => false
-          }.isDefined)
-        } else {
-          assert(hashAggPlan.isInstanceOf[HashAggregateExec])
-        }
-        hashAggDF.collect()
-
-        // ObjectHashAggregate and SortAggregate test case
-        val objHashAggOrSortAggDF = df.groupBy("x").agg(c, collect_list("y"))
-        val objHashAggOrSortAggPlan = objHashAggOrSortAggDF.queryExecution.executedPlan
-        if (useObjectHashAgg) {
-          assert(objHashAggOrSortAggPlan.isInstanceOf[ObjectHashAggregateExec])
-        } else {
-          assert(objHashAggOrSortAggPlan.isInstanceOf[SortAggregateExec])
-        }
-        objHashAggOrSortAggDF.collect()
-      }
-    }
-  }
-
-  test("SPARK-19471: AggregationIterator does not initialize the generated result projection"
+
-    " before using it") {
-    Seq(
-      monotonically_increasing_id(), spark_partition_id(),
-      rand(Random.nextLong()), randn(Random.nextLong())
-    ).foreach(assertNoExceptions)
-  }
-
   test("SPARK-21281 use string types by default if array and map have no argument") {
     val ds = spark.range(1)
     var expectedSchema = new StructType()


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message