spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From dav...@apache.org
Subject spark git commit: [SPARK-13996] [SQL] Add more not null attributes for Filter codegen
Date Sun, 03 Apr 2016 02:34:43 GMT
Repository: spark
Updated Branches:
  refs/heads/master 1cf701834 -> c2f25b1a1


[SPARK-13996] [SQL] Add more not null attributes for Filter codegen

## What changes were proposed in this pull request?
JIRA: https://issues.apache.org/jira/browse/SPARK-13996

Filter codegen finds the attributes not null by checking IsNotNull(a) expression with a condition
if child.output.contains(a). However, the current approach to checking it is not comprehensive.
We can improve it.

E.g., for this plan:

    val rdd = sqlContext.sparkContext.makeRDD(Seq(Row(1, "1"), Row(null, "1"), Row(2, "2")))
    val schema = new StructType().add("k", IntegerType).add("v", StringType)
    val smallDF = sqlContext.createDataFrame(rdd, schema)
    val df = smallDF.filter("isnotnull(k + 1)")

The code snippet generated without this patch:

    /* 031 */   protected void processNext() throws java.io.IOException {
    /* 032 */     /*** PRODUCE: Filter isnotnull((k#0 + 1)) */
    /* 033 */
    /* 034 */     /*** PRODUCE: INPUT */
    /* 035 */
    /* 036 */     while (!shouldStop() && inputadapter_input.hasNext()) {
    /* 037 */       InternalRow inputadapter_row = (InternalRow) inputadapter_input.next();
    /* 038 */       /*** CONSUME: Filter isnotnull((k#0 + 1)) */
    /* 039 */       /* input[0, int] */
    /* 040 */       boolean filter_isNull = inputadapter_row.isNullAt(0);
    /* 041 */       int filter_value = filter_isNull ? -1 : (inputadapter_row.getInt(0));
    /* 042 */
    /* 043 */       /* isnotnull((input[0, int] + 1)) */
    /* 044 */       /* (input[0, int] + 1) */
    /* 045 */       boolean filter_isNull3 = true;
    /* 046 */       int filter_value3 = -1;
    /* 047 */
    /* 048 */       if (!filter_isNull) {
    /* 049 */         filter_isNull3 = false; // resultCode could change nullability.
    /* 050 */         filter_value3 = filter_value + 1;
    /* 051 */
    /* 052 */       }
    /* 053 */       if (!(!(filter_isNull3))) continue;
    /* 054 */
    /* 055 */       filter_metricValue.add(1);

With this patch:

    /* 031 */   protected void processNext() throws java.io.IOException {
    /* 032 */     /*** PRODUCE: Filter isnotnull((k#0 + 1)) */
    /* 033 */
    /* 034 */     /*** PRODUCE: INPUT */
    /* 035 */
    /* 036 */     while (!shouldStop() && inputadapter_input.hasNext()) {
    /* 037 */       InternalRow inputadapter_row = (InternalRow) inputadapter_input.next();
    /* 038 */       /*** CONSUME: Filter isnotnull((k#0 + 1)) */
    /* 039 */       /* input[0, int] */
    /* 040 */       boolean filter_isNull = inputadapter_row.isNullAt(0);
    /* 041 */       int filter_value = filter_isNull ? -1 : (inputadapter_row.getInt(0));
    /* 042 */
    /* 043 */       if (filter_isNull) continue;
    /* 044 */
    /* 045 */       filter_metricValue.add(1);

## How was this patch tested?

Existing tests.

Author: Liang-Chi Hsieh <simonh@tw.ibm.com>

Closes #11810 from viirya/add-more-not-null-attrs.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/c2f25b1a
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/c2f25b1a
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/c2f25b1a

Branch: refs/heads/master
Commit: c2f25b1a148eeb1791ea7018b14b3a665c13212a
Parents: 1cf7018
Author: Liang-Chi Hsieh <simonh@tw.ibm.com>
Authored: Sat Apr 2 19:34:38 2016 -0700
Committer: Davies Liu <davies.liu@gmail.com>
Committed: Sat Apr 2 19:34:38 2016 -0700

----------------------------------------------------------------------
 .../org/apache/spark/sql/execution/basicOperators.scala      | 8 ++++----
 1 file changed, 4 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/c2f25b1a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala
index a6a14df..fb1c618 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala
@@ -79,12 +79,12 @@ case class Filter(condition: Expression, child: SparkPlan)
 
   // Split out all the IsNotNulls from condition.
   private val (notNullPreds, otherPreds) = splitConjunctivePredicates(condition).partition
{
-    case IsNotNull(a) if child.output.exists(_.semanticEquals(a)) => true
+    case IsNotNull(a: NullIntolerant) if a.references.subsetOf(child.outputSet) => true
     case _ => false
   }
 
   // The columns that will filtered out by `IsNotNull` could be considered as not nullable.
-  private val notNullAttributes = notNullPreds.flatMap(_.references)
+  private val notNullAttributes = notNullPreds.flatMap(_.references).distinct.map(_.exprId)
 
   // Mark this as empty. We'll evaluate the input during doConsume(). We don't want to evaluate
   // all the variables at the beginning to take advantage of short circuiting.
@@ -92,7 +92,7 @@ case class Filter(condition: Expression, child: SparkPlan)
 
   override def output: Seq[Attribute] = {
     child.output.map { a =>
-      if (a.nullable && notNullAttributes.exists(_.semanticEquals(a))) {
+      if (a.nullable && notNullAttributes.contains(a.exprId)) {
         a.withNullability(false)
       } else {
         a
@@ -179,7 +179,7 @@ case class Filter(condition: Expression, child: SparkPlan)
     // Reset the isNull to false for the not-null columns, then the followed operators could
     // generate better code (remove dead branches).
     val resultVars = input.zipWithIndex.map { case (ev, i) =>
-      if (notNullAttributes.exists(_.semanticEquals(child.output(i)))) {
+      if (notNullAttributes.contains(child.output(i).exprId)) {
         ev.isNull = "false"
       }
       ev


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message