spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From wenc...@apache.org
Subject spark git commit: [SPARK-21351][SQL] Update nullability based on children's output
Date Wed, 04 Apr 2018 06:39:25 GMT
Repository: spark
Updated Branches:
  refs/heads/master 16ef6baa3 -> 5197562af


[SPARK-21351][SQL] Update nullability based on children's output

## What changes were proposed in this pull request?
This pr added a new optimizer rule `UpdateNullabilityInAttributeReferences ` to update the
nullability that `Filter` changes when having `IsNotNull`. In the master, optimized plans
do not respect the nullability when `Filter` has `IsNotNull`. This wrongly generates unnecessary
code. For example:

```
scala> val df = Seq((Some(1), Some(2))).toDF("a", "b")
scala> val bIsNotNull = df.where($"b" =!= 2).select($"b")
scala> val targetQuery = bIsNotNull.distinct
scala> val targetQuery.queryExecution.optimizedPlan.output(0).nullable
res5: Boolean = true

scala> targetQuery.debugCodegen
Found 2 WholeStageCodegen subtrees.
== Subtree 1 / 2 ==
*HashAggregate(keys=[b#19], functions=[], output=[b#19])
+- Exchange hashpartitioning(b#19, 200)
   +- *HashAggregate(keys=[b#19], functions=[], output=[b#19])
      +- *Project [_2#16 AS b#19]
         +- *Filter isnotnull(_2#16)
            +- LocalTableScan [_1#15, _2#16]

Generated code:
...
/* 124 */   protected void processNext() throws java.io.IOException {
...
/* 132 */     // output the result
/* 133 */
/* 134 */     while (agg_mapIter.next()) {
/* 135 */       wholestagecodegen_numOutputRows.add(1);
/* 136 */       UnsafeRow agg_aggKey = (UnsafeRow) agg_mapIter.getKey();
/* 137 */       UnsafeRow agg_aggBuffer = (UnsafeRow) agg_mapIter.getValue();
/* 138 */
/* 139 */       boolean agg_isNull4 = agg_aggKey.isNullAt(0);
/* 140 */       int agg_value4 = agg_isNull4 ? -1 : (agg_aggKey.getInt(0));
/* 141 */       agg_rowWriter1.zeroOutNullBytes();
/* 142 */
                // We don't need this NULL check because NULL is filtered out in `$"b" =!=2`
/* 143 */       if (agg_isNull4) {
/* 144 */         agg_rowWriter1.setNullAt(0);
/* 145 */       } else {
/* 146 */         agg_rowWriter1.write(0, agg_value4);
/* 147 */       }
/* 148 */       append(agg_result1);
/* 149 */
/* 150 */       if (shouldStop()) return;
/* 151 */     }
/* 152 */
/* 153 */     agg_mapIter.close();
/* 154 */     if (agg_sorter == null) {
/* 155 */       agg_hashMap.free();
/* 156 */     }
/* 157 */   }
/* 158 */
/* 159 */ }
```

In the line 143, we don't need this NULL check because NULL is filtered out in `$"b" =!=2`.
This pr could remove this NULL check;

```
scala> val targetQuery.queryExecution.optimizedPlan.output(0).nullable
res5: Boolean = false

scala> targetQuery.debugCodegen
...
Generated code:
...
/* 144 */   protected void processNext() throws java.io.IOException {
...
/* 152 */     // output the result
/* 153 */
/* 154 */     while (agg_mapIter.next()) {
/* 155 */       wholestagecodegen_numOutputRows.add(1);
/* 156 */       UnsafeRow agg_aggKey = (UnsafeRow) agg_mapIter.getKey();
/* 157 */       UnsafeRow agg_aggBuffer = (UnsafeRow) agg_mapIter.getValue();
/* 158 */
/* 159 */       int agg_value4 = agg_aggKey.getInt(0);
/* 160 */       agg_rowWriter1.write(0, agg_value4);
/* 161 */       append(agg_result1);
/* 162 */
/* 163 */       if (shouldStop()) return;
/* 164 */     }
/* 165 */
/* 166 */     agg_mapIter.close();
/* 167 */     if (agg_sorter == null) {
/* 168 */       agg_hashMap.free();
/* 169 */     }
/* 170 */   }
```

## How was this patch tested?
Added `UpdateNullabilityInAttributeReferencesSuite` for unit tests.

Author: Takeshi Yamamuro <yamamuro@apache.org>

Closes #18576 from maropu/SPARK-21351.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/5197562a
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/5197562a
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/5197562a

Branch: refs/heads/master
Commit: 5197562afe8534b29f5a0d72683c2859f796275d
Parents: 16ef6ba
Author: Takeshi Yamamuro <yamamuro@apache.org>
Authored: Wed Apr 4 14:39:19 2018 +0800
Committer: Wenchen Fan <wenchen@databricks.com>
Committed: Wed Apr 4 14:39:19 2018 +0800

----------------------------------------------------------------------
 .../sql/catalyst/optimizer/Optimizer.scala      | 19 ++++++-
 ...eNullabilityInAttributeReferencesSuite.scala | 57 ++++++++++++++++++++
 .../catalyst/optimizer/complexTypesSuite.scala  |  9 ----
 .../org/apache/spark/sql/DataFrameSuite.scala   |  5 --
 4 files changed, 75 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/5197562a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
index 2829d1d..9a1bbc6 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
@@ -153,7 +153,9 @@ abstract class Optimizer(sessionCatalog: SessionCatalog)
       RewritePredicateSubquery,
       ColumnPruning,
       CollapseProject,
-      RemoveRedundantProject)
+      RemoveRedundantProject) :+
+    Batch("UpdateAttributeReferences", Once,
+      UpdateNullabilityInAttributeReferences)
   }
 
   /**
@@ -1309,3 +1311,18 @@ object RemoveRepetitionFromGroupExpressions extends Rule[LogicalPlan]
{
       }
   }
 }
+
+/**
+ * Updates nullability in [[AttributeReference]]s if nullability is different between
+ * non-leaf plan's expressions and the children output.
+ */
+object UpdateNullabilityInAttributeReferences extends Rule[LogicalPlan] {
+  def apply(plan: LogicalPlan): LogicalPlan = plan transformUp {
+    case p if !p.isInstanceOf[LeafNode] =>
+      val nullabilityMap = AttributeMap(p.children.flatMap(_.output).map { x => x ->
x.nullable })
+      p transformExpressions {
+        case ar: AttributeReference if nullabilityMap.contains(ar) =>
+          ar.withNullability(nullabilityMap(ar))
+      }
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/5197562a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/UpdateNullabilityInAttributeReferencesSuite.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/UpdateNullabilityInAttributeReferencesSuite.scala
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/UpdateNullabilityInAttributeReferencesSuite.scala
new file mode 100644
index 0000000..09b11f5
--- /dev/null
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/UpdateNullabilityInAttributeReferencesSuite.scala
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.optimizer
+
+import org.apache.spark.sql.catalyst.dsl.expressions._
+import org.apache.spark.sql.catalyst.dsl.plans._
+import org.apache.spark.sql.catalyst.expressions.{CreateArray, GetArrayItem}
+import org.apache.spark.sql.catalyst.plans.PlanTest
+import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan}
+import org.apache.spark.sql.catalyst.rules.RuleExecutor
+
+
+class UpdateNullabilityInAttributeReferencesSuite extends PlanTest {
+
+  object Optimizer extends RuleExecutor[LogicalPlan] {
+    val batches =
+      Batch("Constant Folding", FixedPoint(10),
+          NullPropagation,
+          ConstantFolding,
+          BooleanSimplification,
+          SimplifyConditionals,
+          SimplifyBinaryComparison,
+          SimplifyExtractValueOps) ::
+      Batch("UpdateAttributeReferences", Once,
+        UpdateNullabilityInAttributeReferences) :: Nil
+  }
+
+  test("update nullability in AttributeReference")  {
+    val rel = LocalRelation('a.long.notNull)
+    // In the 'original' plans below, the Aggregate node produced by groupBy() has a
+    // nullable AttributeReference to `b`, because both array indexing and map lookup are
+    // nullable expressions. After optimization, the same attribute is now non-nullable,
+    // but the AttributeReference is not updated to reflect this. So, we need to update nullability
+    // by the `UpdateNullabilityInAttributeReferences` rule.
+    val original = rel
+      .select(GetArrayItem(CreateArray(Seq('a, 'a + 1L)), 0) as "b")
+      .groupBy($"b")("1")
+    val expected = rel.select('a as "b").groupBy($"b")("1").analyze
+    val optimized = Optimizer.execute(original.analyze)
+    comparePlans(optimized, expected)
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/5197562a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/complexTypesSuite.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/complexTypesSuite.scala
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/complexTypesSuite.scala
index 21ed987..633d86d 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/complexTypesSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/complexTypesSuite.scala
@@ -378,15 +378,6 @@ class ComplexTypesSuite extends PlanTest with ExpressionEvalHelper {
       .groupBy($"foo")("1")
     checkRule(structRel, structExpected)
 
-    // These tests must use nullable attributes from the base relation for the following
reason:
-    // in the 'original' plans below, the Aggregate node produced by groupBy() has a
-    // nullable AttributeReference to a1, because both array indexing and map lookup are
-    // nullable expressions. After optimization, the same attribute is now non-nullable,
-    // but the AttributeReference is not updated to reflect this. In the 'expected' plans,
-    // the grouping expressions have the same nullability as the original attribute in the
-    // relation. If that attribute is non-nullable, the tests will fail as the plans will
-    // compare differently, so for these tests we must use a nullable attribute. See
-    // SPARK-23634.
     val arrayRel = relation
       .select(GetArrayItem(CreateArray(Seq('nullable_id, 'nullable_id + 1L)), 0) as "a1")
       .groupBy($"a1")("1")

http://git-wip-us.apache.org/repos/asf/spark/blob/5197562a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
index f7b3393..60e84e6 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
@@ -2055,11 +2055,6 @@ class DataFrameSuite extends QueryTest with SharedSQLContext {
       expr: String,
       expectedNonNullableColumns: Seq[String]): Unit = {
     val dfWithFilter = df.where(s"isnotnull($expr)").selectExpr(expr)
-    // In the logical plan, all the output columns of input dataframe are nullable
-    dfWithFilter.queryExecution.optimizedPlan.collect {
-      case e: Filter => assert(e.output.forall(_.nullable))
-    }
-
     dfWithFilter.queryExecution.executedPlan.collect {
       // When the child expression in isnotnull is null-intolerant (i.e. any null input will
       // result in null output), the involved columns are converted to not nullable;


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message