spark-reviews mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From GitBox <...@apache.org>
Subject [GitHub] [spark] cloud-fan commented on a change in pull request #26167: [SPARK-28893][SQL] Support MERGE INTO in the parser and add the corresponding logical plan
Date Fri, 08 Nov 2019 08:40:42 GMT
cloud-fan commented on a change in pull request #26167: [SPARK-28893][SQL] Support MERGE INTO
in the parser and add the corresponding logical plan
URL: https://github.com/apache/spark/pull/26167#discussion_r344038513
 
 

 ##########
 File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
 ##########
 @@ -1178,11 +1178,57 @@ class Analyzer(
         // table by ResolveOutputRelation. that rule will alias the attributes to the table's
names.
         o
 
+      case m @ MergeIntoTable(targetTable, sourceTable, _, _, _)
+        if !m.resolved && targetTable.resolved && sourceTable.resolved =>
+        val newMatchedActions = m.matchedActions.map {
+          case DeleteAction(deleteCondition) =>
+            val resolvedDeleteCondition = deleteCondition.map(resolveExpressionTopDown(_,
m))
+            DeleteAction(resolvedDeleteCondition)
+          case UpdateAction(updateCondition, assignments) =>
+            val resolvedUpdateCondition = updateCondition.map(resolveExpressionTopDown(_,
m))
+            UpdateAction(resolvedUpdateCondition, resolveAssignments(assignments, m))
+          case o => o
+        }
+        val newNotMatchedActions = m.notMatchedActions.map {
+          case InsertAction(insertCondition, assignments) =>
+            val resolvedInsertCondition = insertCondition.map(resolveExpressionTopDown(_,
m))
+            InsertAction(resolvedInsertCondition, resolveAssignments(assignments, m))
+          case o => o
+        }
+        val resolvedMergeCondition = resolveExpressionTopDown(m.mergeCondition, m)
+        m.copy(mergeCondition = resolvedMergeCondition,
+          matchedActions = newMatchedActions,
+          notMatchedActions = newNotMatchedActions)
+
       case q: LogicalPlan =>
         logTrace(s"Attempting to resolve ${q.simpleString(SQLConf.get.maxToStringFields)}")
         q.mapExpressions(resolveExpressionTopDown(_, q))
     }
 
+    def resolveAssignments(
+        assignments: Seq[Assignment],
+        mergeInto: MergeIntoTable): Seq[Assignment] = {
+      if (assignments.isEmpty) {
+        val expandedColumns = UnresolvedStar(None).expand(mergeInto.targetTable, resolver)
 
 Review comment:
   if you look at `UnresolvedStar.expand`, it just returns `input.output` if `target` is empty.
Here we can just write `val expandedColumns = mergeInto.targetTable.output`

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Mime
View raw message