spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From lix...@apache.org
Subject spark git commit: [SPARK-20534][SQL] Make outer generate exec return empty rows
Date Mon, 01 May 2017 16:46:53 GMT
Repository: spark
Updated Branches:
  refs/heads/branch-2.2 c890e938c -> 813abd2db


[SPARK-20534][SQL] Make outer generate exec return empty rows

## What changes were proposed in this pull request?
Generate exec does not produce `null` values if the generator for the input row is empty and
the generate operates in outer mode without join. This is caused by the fact that the `join=false`
code path is different from the `join=true` code path, and that the `join=false` code path
did deal with outer properly. This PR addresses this issue.

## How was this patch tested?
Updated `outer*` tests in `GeneratorFunctionSuite`.

Author: Herman van Hovell <hvanhovell@databricks.com>

Closes #17810 from hvanhovell/SPARK-20534.

(cherry picked from commit 6b44c4d63ab14162e338c5f1ac77333956870a90)
Signed-off-by: gatorsmile <gatorsmile@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/813abd2d
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/813abd2d
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/813abd2d

Branch: refs/heads/branch-2.2
Commit: 813abd2db6140c4a294cdbeca2303dbfb7903107
Parents: c890e93
Author: Herman van Hovell <hvanhovell@databricks.com>
Authored: Mon May 1 09:46:35 2017 -0700
Committer: gatorsmile <gatorsmile@gmail.com>
Committed: Mon May 1 09:46:44 2017 -0700

----------------------------------------------------------------------
 .../sql/catalyst/optimizer/Optimizer.scala      |  3 +-
 .../plans/logical/basicLogicalOperators.scala   |  2 +-
 .../spark/sql/execution/GenerateExec.scala      | 33 +++++++++++---------
 .../spark/sql/GeneratorFunctionSuite.scala      | 12 +++----
 4 files changed, 26 insertions(+), 24 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/813abd2d/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
index dd768d1..f2b9764 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
@@ -441,8 +441,7 @@ object ColumnPruning extends Rule[LogicalPlan] {
       g.copy(child = prunedChild(g.child, g.references))
 
     // Turn off `join` for Generate if no column from it's child is used
-    case p @ Project(_, g: Generate)
-        if g.join && !g.outer && p.references.subsetOf(g.generatedSet) =>
+    case p @ Project(_, g: Generate) if g.join && p.references.subsetOf(g.generatedSet)
=>
       p.copy(child = g.copy(join = false))
 
     // Eliminate unneeded attributes from right side of a Left Existence Join.

http://git-wip-us.apache.org/repos/asf/spark/blob/813abd2d/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
index 3ad757e..f663d7b 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
@@ -83,7 +83,7 @@ case class Project(projectList: Seq[NamedExpression], child: LogicalPlan)
extend
  * @param join  when true, each output row is implicitly joined with the input tuple that
produced
  *              it.
  * @param outer when true, each input row will be output at least once, even if the output
of the
- *              given `generator` is empty. `outer` has no effect when `join` is false.
+ *              given `generator` is empty.
  * @param qualifier Qualifier for the attributes of generator(UDTF)
  * @param generatorOutput The output schema of the Generator.
  * @param child Children logical plan node

http://git-wip-us.apache.org/repos/asf/spark/blob/813abd2d/sql/core/src/main/scala/org/apache/spark/sql/execution/GenerateExec.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/GenerateExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/GenerateExec.scala
index f87d058..1812a11 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/GenerateExec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/GenerateExec.scala
@@ -32,7 +32,7 @@ import org.apache.spark.sql.types.{ArrayType, DataType, MapType, StructType}
 private[execution] sealed case class LazyIterator(func: () => TraversableOnce[InternalRow])
   extends Iterator[InternalRow] {
 
-  lazy val results = func().toIterator
+  lazy val results: Iterator[InternalRow] = func().toIterator
   override def hasNext: Boolean = results.hasNext
   override def next(): InternalRow = results.next()
 }
@@ -50,7 +50,7 @@ private[execution] sealed case class LazyIterator(func: () => TraversableOnce[In
  * @param join  when true, each output row is implicitly joined with the input tuple that
produced
  *              it.
  * @param outer when true, each input row will be output at least once, even if the output
of the
- *              given `generator` is empty. `outer` has no effect when `join` is false.
+ *              given `generator` is empty.
  * @param generatorOutput the qualified output attributes of the generator of this node,
which
  *                        constructed in analysis phase, and we can not change it, as the
  *                        parent node bound with it already.
@@ -78,15 +78,15 @@ case class GenerateExec(
 
   override def outputPartitioning: Partitioning = child.outputPartitioning
 
-  val boundGenerator = BindReferences.bindReference(generator, child.output)
+  val boundGenerator: Generator = BindReferences.bindReference(generator, child.output)
 
   protected override def doExecute(): RDD[InternalRow] = {
     // boundGenerator.terminate() should be triggered after all of the rows in the partition
-    val rows = if (join) {
-      child.execute().mapPartitionsInternal { iter =>
-        val generatorNullRow = new GenericInternalRow(generator.elementSchema.length)
+    val numOutputRows = longMetric("numOutputRows")
+    child.execute().mapPartitionsWithIndexInternal { (index, iter) =>
+      val generatorNullRow = new GenericInternalRow(generator.elementSchema.length)
+      val rows = if (join) {
         val joinedRow = new JoinedRow
-
         iter.flatMap { row =>
           // we should always set the left (child output)
           joinedRow.withLeft(row)
@@ -101,18 +101,21 @@ case class GenerateExec(
           // keep it the same as Hive does
           joinedRow.withRight(row)
         }
+      } else {
+        iter.flatMap { row =>
+          val outputRows = boundGenerator.eval(row)
+          if (outer && outputRows.isEmpty) {
+            Seq(generatorNullRow)
+          } else {
+            outputRows
+          }
+        } ++ LazyIterator(boundGenerator.terminate)
       }
-    } else {
-      child.execute().mapPartitionsInternal { iter =>
-        iter.flatMap(boundGenerator.eval) ++ LazyIterator(boundGenerator.terminate)
-      }
-    }
 
-    val numOutputRows = longMetric("numOutputRows")
-    rows.mapPartitionsWithIndexInternal { (index, iter) =>
+      // Convert the rows to unsafe rows.
       val proj = UnsafeProjection.create(output, output)
       proj.initialize(index)
-      iter.map { r =>
+      rows.map { r =>
         numOutputRows += 1
         proj(r)
       }

http://git-wip-us.apache.org/repos/asf/spark/blob/813abd2d/sql/core/src/test/scala/org/apache/spark/sql/GeneratorFunctionSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/GeneratorFunctionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/GeneratorFunctionSuite.scala
index cef5bbf..b9871af 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/GeneratorFunctionSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/GeneratorFunctionSuite.scala
@@ -91,7 +91,7 @@ class GeneratorFunctionSuite extends QueryTest with SharedSQLContext {
     val df = Seq((1, Seq(1, 2, 3)), (2, Seq())).toDF("a", "intList")
     checkAnswer(
       df.select(explode_outer('intList)),
-      Row(1) :: Row(2) :: Row(3) :: Nil)
+      Row(1) :: Row(2) :: Row(3) :: Row(null) :: Nil)
   }
 
   test("single posexplode") {
@@ -105,7 +105,7 @@ class GeneratorFunctionSuite extends QueryTest with SharedSQLContext {
     val df = Seq((1, Seq(1, 2, 3)), (2, Seq())).toDF("a", "intList")
     checkAnswer(
       df.select(posexplode_outer('intList)),
-      Row(0, 1) :: Row(1, 2) :: Row(2, 3) :: Nil)
+      Row(0, 1) :: Row(1, 2) :: Row(2, 3) :: Row(null, null) :: Nil)
   }
 
   test("explode and other columns") {
@@ -161,7 +161,7 @@ class GeneratorFunctionSuite extends QueryTest with SharedSQLContext {
 
     checkAnswer(
       df.select(explode_outer('intList).as('int)).select('int),
-      Row(1) :: Row(2) :: Row(3) :: Nil)
+      Row(1) :: Row(2) :: Row(3) :: Row(null) :: Nil)
 
     checkAnswer(
       df.select(explode('intList).as('int)).select(sum('int)),
@@ -182,7 +182,7 @@ class GeneratorFunctionSuite extends QueryTest with SharedSQLContext {
 
     checkAnswer(
       df.select(explode_outer('map)),
-      Row("a", "b") :: Row("c", "d") :: Nil)
+      Row("a", "b") :: Row(null, null) :: Row("c", "d") :: Nil)
   }
 
   test("explode on map with aliases") {
@@ -198,7 +198,7 @@ class GeneratorFunctionSuite extends QueryTest with SharedSQLContext {
 
     checkAnswer(
       df.select(explode_outer('map).as("key1" :: "value1" :: Nil)).select("key1", "value1"),
-      Row("a", "b") :: Nil)
+      Row("a", "b") :: Row(null, null) :: Nil)
   }
 
   test("self join explode") {
@@ -279,7 +279,7 @@ class GeneratorFunctionSuite extends QueryTest with SharedSQLContext {
     )
     checkAnswer(
       df2.selectExpr("inline_outer(col1)"),
-      Row(3, "4") :: Row(5, "6") :: Nil
+      Row(null, null) :: Row(3, "4") :: Row(5, "6") :: Nil
     )
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message