spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From t...@apache.org
Subject spark git commit: [SPARK-22018][SQL] Preserve top-level alias metadata when collapsing projects
Date Fri, 15 Sep 2017 05:32:25 GMT
Repository: spark
Updated Branches:
  refs/heads/master a28728a9a -> 88661747f


[SPARK-22018][SQL] Preserve top-level alias metadata when collapsing projects

## What changes were proposed in this pull request?
If there are two projects like as follows.
```
Project [a_with_metadata#27 AS b#26]
+- Project [a#0 AS a_with_metadata#27]
   +- LocalRelation <empty>, [a#0, b#1]
```
Child Project has an output column with a metadata in it, and the parent Project has an alias
that implicitly forwards the metadata. So this metadata is visible for higher operators. Upon
applying CollapseProject optimizer rule, the metadata is not preserved.
```
Project [a#0 AS b#26]
+- LocalRelation <empty>, [a#0, b#1]
```
This is incorrect, as downstream operators that expect certain metadata (e.g. watermark in
structured streaming) to identify certain fields will fail to do so. This PR fixes it by preserving
the metadata of top-level aliases.

## How was this patch tested?
New unit test

Author: Tathagata Das <tathagata.das1565@gmail.com>

Closes #19240 from tdas/SPARK-22018.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/88661747
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/88661747
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/88661747

Branch: refs/heads/master
Commit: 88661747f506e73c79de36711daebb0330de7b0d
Parents: a28728a
Author: Tathagata Das <tathagata.das1565@gmail.com>
Authored: Thu Sep 14 22:32:16 2017 -0700
Committer: Tathagata Das <tathagata.das1565@gmail.com>
Committed: Thu Sep 14 22:32:16 2017 -0700

----------------------------------------------------------------------
 .../spark/sql/catalyst/analysis/Analyzer.scala  |  5 ++++-
 .../optimizer/CollapseProjectSuite.scala        | 23 ++++++++++++++++++--
 2 files changed, 25 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/88661747/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
index 0880bd6..db276fb 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
@@ -2256,7 +2256,10 @@ object CleanupAliases extends Rule[LogicalPlan] {
 
   def trimNonTopLevelAliases(e: Expression): Expression = e match {
     case a: Alias =>
-      a.withNewChildren(trimAliases(a.child) :: Nil)
+      a.copy(child = trimAliases(a.child))(
+        exprId = a.exprId,
+        qualifier = a.qualifier,
+        explicitMetadata = Some(a.metadata))
     case other => trimAliases(other)
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/88661747/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/CollapseProjectSuite.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/CollapseProjectSuite.scala
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/CollapseProjectSuite.scala
index 587437e..e7a5bce 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/CollapseProjectSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/CollapseProjectSuite.scala
@@ -20,10 +20,11 @@ package org.apache.spark.sql.catalyst.optimizer
 import org.apache.spark.sql.catalyst.analysis.EliminateSubqueryAliases
 import org.apache.spark.sql.catalyst.dsl.expressions._
 import org.apache.spark.sql.catalyst.dsl.plans._
-import org.apache.spark.sql.catalyst.expressions.Rand
+import org.apache.spark.sql.catalyst.expressions.{Alias, Rand}
 import org.apache.spark.sql.catalyst.plans.PlanTest
-import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan}
+import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan, Project}
 import org.apache.spark.sql.catalyst.rules.RuleExecutor
+import org.apache.spark.sql.types.MetadataBuilder
 
 class CollapseProjectSuite extends PlanTest {
   object Optimize extends RuleExecutor[LogicalPlan] {
@@ -119,4 +120,22 @@ class CollapseProjectSuite extends PlanTest {
 
     comparePlans(optimized, correctAnswer)
   }
+
+  test("preserve top-level alias metadata while collapsing projects") {
+    def hasMetadata(logicalPlan: LogicalPlan): Boolean = {
+      logicalPlan.asInstanceOf[Project].projectList.exists(_.metadata.contains("key"))
+    }
+
+    val metadata = new MetadataBuilder().putLong("key", 1).build()
+    val analyzed =
+      Project(Seq(Alias('a_with_metadata, "b")()),
+        Project(Seq(Alias('a, "a_with_metadata")(explicitMetadata = Some(metadata))),
+          testRelation.logicalPlan)).analyze
+    require(hasMetadata(analyzed))
+
+    val optimized = Optimize.execute(analyzed)
+    val projects = optimized.collect { case p: Project => p }
+    assert(projects.size === 1)
+    assert(hasMetadata(optimized))
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message