Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 1ACFC200C77 for ; Mon, 1 May 2017 18:46:55 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 1943B160BAE; Mon, 1 May 2017 16:46:55 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 396A0160BA0 for ; Mon, 1 May 2017 18:46:54 +0200 (CEST) Received: (qmail 83157 invoked by uid 500); 1 May 2017 16:46:53 -0000 Mailing-List: contact commits-help@spark.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Delivered-To: mailing list commits@spark.apache.org Received: (qmail 83148 invoked by uid 99); 1 May 2017 16:46:53 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 01 May 2017 16:46:53 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 4AAEFDFB91; Mon, 1 May 2017 16:46:53 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: lixiao@apache.org To: commits@spark.apache.org Message-Id: X-Mailer: ASF-Git Admin Mailer Subject: spark git commit: [SPARK-20534][SQL] Make outer generate exec return empty rows Date: Mon, 1 May 2017 16:46:53 +0000 (UTC) archived-at: Mon, 01 May 2017 16:46:55 -0000 Repository: spark Updated Branches: refs/heads/branch-2.2 c890e938c -> 813abd2db [SPARK-20534][SQL] Make outer generate exec return empty rows ## What changes were proposed in this pull request? Generate exec does not produce `null` values if the generator for the input row is empty and the generate operates in outer mode without join. This is caused by the fact that the `join=false` code path is different from the `join=true` code path, and that the `join=false` code path did deal with outer properly. This PR addresses this issue. ## How was this patch tested? Updated `outer*` tests in `GeneratorFunctionSuite`. Author: Herman van Hovell Closes #17810 from hvanhovell/SPARK-20534. (cherry picked from commit 6b44c4d63ab14162e338c5f1ac77333956870a90) Signed-off-by: gatorsmile Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/813abd2d Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/813abd2d Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/813abd2d Branch: refs/heads/branch-2.2 Commit: 813abd2db6140c4a294cdbeca2303dbfb7903107 Parents: c890e93 Author: Herman van Hovell Authored: Mon May 1 09:46:35 2017 -0700 Committer: gatorsmile Committed: Mon May 1 09:46:44 2017 -0700 ---------------------------------------------------------------------- .../sql/catalyst/optimizer/Optimizer.scala | 3 +- .../plans/logical/basicLogicalOperators.scala | 2 +- .../spark/sql/execution/GenerateExec.scala | 33 +++++++++++--------- .../spark/sql/GeneratorFunctionSuite.scala | 12 +++---- 4 files changed, 26 insertions(+), 24 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/813abd2d/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index dd768d1..f2b9764 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -441,8 +441,7 @@ object ColumnPruning extends Rule[LogicalPlan] { g.copy(child = prunedChild(g.child, g.references)) // Turn off `join` for Generate if no column from it's child is used - case p @ Project(_, g: Generate) - if g.join && !g.outer && p.references.subsetOf(g.generatedSet) => + case p @ Project(_, g: Generate) if g.join && p.references.subsetOf(g.generatedSet) => p.copy(child = g.copy(join = false)) // Eliminate unneeded attributes from right side of a Left Existence Join. http://git-wip-us.apache.org/repos/asf/spark/blob/813abd2d/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala index 3ad757e..f663d7b 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala @@ -83,7 +83,7 @@ case class Project(projectList: Seq[NamedExpression], child: LogicalPlan) extend * @param join when true, each output row is implicitly joined with the input tuple that produced * it. * @param outer when true, each input row will be output at least once, even if the output of the - * given `generator` is empty. `outer` has no effect when `join` is false. + * given `generator` is empty. * @param qualifier Qualifier for the attributes of generator(UDTF) * @param generatorOutput The output schema of the Generator. * @param child Children logical plan node http://git-wip-us.apache.org/repos/asf/spark/blob/813abd2d/sql/core/src/main/scala/org/apache/spark/sql/execution/GenerateExec.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/GenerateExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/GenerateExec.scala index f87d058..1812a11 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/GenerateExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/GenerateExec.scala @@ -32,7 +32,7 @@ import org.apache.spark.sql.types.{ArrayType, DataType, MapType, StructType} private[execution] sealed case class LazyIterator(func: () => TraversableOnce[InternalRow]) extends Iterator[InternalRow] { - lazy val results = func().toIterator + lazy val results: Iterator[InternalRow] = func().toIterator override def hasNext: Boolean = results.hasNext override def next(): InternalRow = results.next() } @@ -50,7 +50,7 @@ private[execution] sealed case class LazyIterator(func: () => TraversableOnce[In * @param join when true, each output row is implicitly joined with the input tuple that produced * it. * @param outer when true, each input row will be output at least once, even if the output of the - * given `generator` is empty. `outer` has no effect when `join` is false. + * given `generator` is empty. * @param generatorOutput the qualified output attributes of the generator of this node, which * constructed in analysis phase, and we can not change it, as the * parent node bound with it already. @@ -78,15 +78,15 @@ case class GenerateExec( override def outputPartitioning: Partitioning = child.outputPartitioning - val boundGenerator = BindReferences.bindReference(generator, child.output) + val boundGenerator: Generator = BindReferences.bindReference(generator, child.output) protected override def doExecute(): RDD[InternalRow] = { // boundGenerator.terminate() should be triggered after all of the rows in the partition - val rows = if (join) { - child.execute().mapPartitionsInternal { iter => - val generatorNullRow = new GenericInternalRow(generator.elementSchema.length) + val numOutputRows = longMetric("numOutputRows") + child.execute().mapPartitionsWithIndexInternal { (index, iter) => + val generatorNullRow = new GenericInternalRow(generator.elementSchema.length) + val rows = if (join) { val joinedRow = new JoinedRow - iter.flatMap { row => // we should always set the left (child output) joinedRow.withLeft(row) @@ -101,18 +101,21 @@ case class GenerateExec( // keep it the same as Hive does joinedRow.withRight(row) } + } else { + iter.flatMap { row => + val outputRows = boundGenerator.eval(row) + if (outer && outputRows.isEmpty) { + Seq(generatorNullRow) + } else { + outputRows + } + } ++ LazyIterator(boundGenerator.terminate) } - } else { - child.execute().mapPartitionsInternal { iter => - iter.flatMap(boundGenerator.eval) ++ LazyIterator(boundGenerator.terminate) - } - } - val numOutputRows = longMetric("numOutputRows") - rows.mapPartitionsWithIndexInternal { (index, iter) => + // Convert the rows to unsafe rows. val proj = UnsafeProjection.create(output, output) proj.initialize(index) - iter.map { r => + rows.map { r => numOutputRows += 1 proj(r) } http://git-wip-us.apache.org/repos/asf/spark/blob/813abd2d/sql/core/src/test/scala/org/apache/spark/sql/GeneratorFunctionSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/GeneratorFunctionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/GeneratorFunctionSuite.scala index cef5bbf..b9871af 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/GeneratorFunctionSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/GeneratorFunctionSuite.scala @@ -91,7 +91,7 @@ class GeneratorFunctionSuite extends QueryTest with SharedSQLContext { val df = Seq((1, Seq(1, 2, 3)), (2, Seq())).toDF("a", "intList") checkAnswer( df.select(explode_outer('intList)), - Row(1) :: Row(2) :: Row(3) :: Nil) + Row(1) :: Row(2) :: Row(3) :: Row(null) :: Nil) } test("single posexplode") { @@ -105,7 +105,7 @@ class GeneratorFunctionSuite extends QueryTest with SharedSQLContext { val df = Seq((1, Seq(1, 2, 3)), (2, Seq())).toDF("a", "intList") checkAnswer( df.select(posexplode_outer('intList)), - Row(0, 1) :: Row(1, 2) :: Row(2, 3) :: Nil) + Row(0, 1) :: Row(1, 2) :: Row(2, 3) :: Row(null, null) :: Nil) } test("explode and other columns") { @@ -161,7 +161,7 @@ class GeneratorFunctionSuite extends QueryTest with SharedSQLContext { checkAnswer( df.select(explode_outer('intList).as('int)).select('int), - Row(1) :: Row(2) :: Row(3) :: Nil) + Row(1) :: Row(2) :: Row(3) :: Row(null) :: Nil) checkAnswer( df.select(explode('intList).as('int)).select(sum('int)), @@ -182,7 +182,7 @@ class GeneratorFunctionSuite extends QueryTest with SharedSQLContext { checkAnswer( df.select(explode_outer('map)), - Row("a", "b") :: Row("c", "d") :: Nil) + Row("a", "b") :: Row(null, null) :: Row("c", "d") :: Nil) } test("explode on map with aliases") { @@ -198,7 +198,7 @@ class GeneratorFunctionSuite extends QueryTest with SharedSQLContext { checkAnswer( df.select(explode_outer('map).as("key1" :: "value1" :: Nil)).select("key1", "value1"), - Row("a", "b") :: Nil) + Row("a", "b") :: Row(null, null) :: Nil) } test("self join explode") { @@ -279,7 +279,7 @@ class GeneratorFunctionSuite extends QueryTest with SharedSQLContext { ) checkAnswer( df2.selectExpr("inline_outer(col1)"), - Row(3, "4") :: Row(5, "6") :: Nil + Row(null, null) :: Row(3, "4") :: Row(5, "6") :: Nil ) } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org For additional commands, e-mail: commits-help@spark.apache.org