spark-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Abhijit Bhole (JIRA)" <j...@apache.org>
Subject [jira] [Issue Comment Deleted] (SPARK-21034) Allow filter pushdown filters through non deterministic functions for columns involved in groupby / join
Date Mon, 14 Aug 2017 13:51:03 GMT

     [ https://issues.apache.org/jira/browse/SPARK-21034?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]

Abhijit Bhole updated SPARK-21034:
----------------------------------
    Comment: was deleted

(was: In FilterPushdownSuite.scala, it seems this should have been handled? Am I making some
mistake in understanding?
 
{code:java}
 test("nondeterministic: push down part of filter through aggregate with deterministic field")
{
    val originalQuery = testRelation
      .groupBy('a)('a)
      .where('a > 5 && Rand(10) > 5)
      .analyze

    val optimized = Optimize.execute(originalQuery.analyze)

    val correctAnswer = testRelation
      .where('a > 5)
      .groupBy('a)('a)
      .where(Rand(10) > 5)
      .analyze

    comparePlans(optimized, correctAnswer)
  }
{code}
)

> Allow filter pushdown filters through non deterministic functions for columns involved
in groupby / join
> --------------------------------------------------------------------------------------------------------
>
>                 Key: SPARK-21034
>                 URL: https://issues.apache.org/jira/browse/SPARK-21034
>             Project: Spark
>          Issue Type: Bug
>          Components: Optimizer, SQL
>    Affects Versions: 2.1.1, 2.2.0
>            Reporter: Abhijit Bhole
>
> If the column is involved in aggregation / join then pushing down filter should not change
the results.
> Here is a sample code - 
> {code:java}
> from pyspark.sql import functions as F
> df = spark.createDataFrame([{ "a": 1, "b" : 2, "c":7}, { "a": 3, "b" : 4, "c" : 8},
>                            { "a": 1, "b" : 5, "c":7}, { "a": 1, "b" : 6, "c":7} ])
> df.groupBy(["a"]).agg(F.sum("b")).where("a = 1").explain()
> df.groupBy(["a"]).agg(F.sum("b"), F.first("c")).where("a = 1").explain()
> == Physical Plan ==
> *HashAggregate(keys=[a#15L], functions=[sum(b#16L)])
> +- Exchange hashpartitioning(a#15L, 4)
>    +- *HashAggregate(keys=[a#15L], functions=[partial_sum(b#16L)])
>       +- *Project [a#15L, b#16L]
>          +- *Filter (isnotnull(a#15L) && (a#15L = 1))
>             +- Scan ExistingRDD[a#15L,b#16L,c#17L]
> >>>
> >>> df.groupBy(["a"]).agg(F.sum("b"), F.first("c")).where("a = 1").explain()
> == Physical Plan ==
> *Filter (isnotnull(a#15L) && (a#15L = 1))
> +- *HashAggregate(keys=[a#15L], functions=[sum(b#16L), first(c#17L, false)])
>    +- Exchange hashpartitioning(a#15L, 4)
>       +- *HashAggregate(keys=[a#15L], functions=[partial_sum(b#16L), partial_first(c#17L,
false)])
>          +- Scan ExistingRDD[a#15L,b#16L,c#17L]
> {code}
> As you can see, the filter is not pushed down when F.first aggregate function is used.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org


Mime
View raw message