[ https://issues.apache.org/jira/browse/SPARK-23500?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Henry Robinson updated SPARK-23500:
-----------------------------------
Description:
Simple filters on dataframes joined with {{joinWith()}} are missing an opportunity to get
pushed into the scan because they're written in terms of {{named_struct}} that could be removed
by the optimizer.
Given the following simple query over two dataframes:
{code:java}
scala> val df = spark.read.parquet("one_million")
df: org.apache.spark.sql.DataFrame = [id: bigint, id2: bigint]
scala> val df2 = spark.read.parquet("one_million")
df2: org.apache.spark.sql.DataFrame = [id: bigint, id2: bigint]
scala> df.joinWith(df2, df2.col("id") === df.col("id2")).filter("_2.id > 30").explain
== Physical Plan ==
*(2) BroadcastHashJoin [_1#94.id2], [_2#95.id], Inner, BuildRight
:- *(2) Project [named_struct(id, id#0L, id2, id2#1L) AS _1#94]
: +- *(2) FileScan parquet [id#0L,id2#1L] Batched: true, Format: Parquet, Location: InMemoryFileIndex[file:/Users/henry/src/spark/one_million],
PartitionFilters: [], PushedFilters: [], ReadSchema: struct<id:bigint,id2:bigint>
+- BroadcastExchange HashedRelationBroadcastMode(List(input[0, struct<id:bigint,id2:bigint>,
false].id))
+- *(1) Project [named_struct(id, id#90L, id2, id2#91L) AS _2#95]
+- *(1) Filter (named_struct(id, id#90L, id2, id2#91L).id > 30)
+- *(1) FileScan parquet [id#90L,id2#91L] Batched: true, Format: Parquet, Location:
InMemoryFileIndex[file:/Users/henry/src/spark/one_million], PartitionFilters: [], PushedFilters:
[], ReadSchema: struct<id:bigint,id2:bigint>
{code}
Using {{joinWith}} means that the filter is placed on a {{named_struct}}, and is then pushed
down. When the filter is just above the scan, the wrapping-and-projection of {{named_struct(id...).id}}
is a no-op and could be removed. Then the filter can be pushed down to Parquet.
was:
Simple filters on dataframes joined with {{joinWith()}} are missing an opportunity to get
pushed into the scan because they're written in terms of {{named_struct}} that could be removed
by the optimizer.
Given the following simple query over two dataframes:
{code:java}
scala> val df = spark.read.parquet("one_million")
df: org.apache.spark.sql.DataFrame = [id: bigint, id2: bigint]
scala> val df2 = spark.read.parquet("one_million")
df2: org.apache.spark.sql.DataFrame = [id: bigint, id2: bigint]
scala> df.joinWith(df2, df2.col("id") === df.col("id2")).filter("_2.id > 30").explain
== Physical Plan ==
*(2) BroadcastHashJoin [_1#94.id2], [_2#95.id], Inner, BuildRight
:- *(2) Project [named_struct(id, id#0L, id2, id2#1L) AS _1#94]
: +- *(2) FileScan parquet [id#0L,id2#1L] Batched: true, Format: Parquet, Location: InMemoryFileIndex[file:/Users/henry/src/spark/one_million],
PartitionFilters: [], PushedFilters: [], ReadSchema: struct<id:bigint,id2:bigint>
+- BroadcastExchange HashedRelationBroadcastMode(List(input[0, struct<id:bigint,id2:bigint>,
false].id))
+- *(1) Project [named_struct(id, id#90L, id2, id2#91L) AS _2#95]
+- *(1) Filter (named_struct(id, id#90L, id2, id2#91L).id > 30)
+- *(1) FileScan parquet [id#90L,id2#91L] Batched: true, Format: Parquet, Location: InMemoryFileIndex[file:/Users/henry/src/spark/one_million],
PartitionFilters: [], PushedFilters: [], ReadSchema: struct<id:bigint,id2:bigint>
{code}
Using {{joinWith}} means that the filter is placed on a {{named_struct}}, and is then pushed
down. When the filter is just above the scan, the wrapping-and-projection of {{named_struct(id...).id}}
is a no-op and could be removed. Then the filter can be pushed down to Parquet.
> Filters on named_structs could be pushed into scans
> ---------------------------------------------------
>
> Key: SPARK-23500
> URL: https://issues.apache.org/jira/browse/SPARK-23500
> Project: Spark
> Issue Type: Improvement
> Components: SQL
> Affects Versions: 2.3.0
> Reporter: Henry Robinson
> Priority: Major
>
> Simple filters on dataframes joined with {{joinWith()}} are missing an opportunity to
get pushed into the scan because they're written in terms of {{named_struct}} that could be
removed by the optimizer.
> Given the following simple query over two dataframes:
> {code:java}
> scala> val df = spark.read.parquet("one_million")
> df: org.apache.spark.sql.DataFrame = [id: bigint, id2: bigint]
> scala> val df2 = spark.read.parquet("one_million")
> df2: org.apache.spark.sql.DataFrame = [id: bigint, id2: bigint]
> scala> df.joinWith(df2, df2.col("id") === df.col("id2")).filter("_2.id > 30").explain
> == Physical Plan ==
> *(2) BroadcastHashJoin [_1#94.id2], [_2#95.id], Inner, BuildRight
> :- *(2) Project [named_struct(id, id#0L, id2, id2#1L) AS _1#94]
> : +- *(2) FileScan parquet [id#0L,id2#1L] Batched: true, Format: Parquet, Location:
InMemoryFileIndex[file:/Users/henry/src/spark/one_million], PartitionFilters: [], PushedFilters:
[], ReadSchema: struct<id:bigint,id2:bigint>
> +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, struct<id:bigint,id2:bigint>,
false].id))
> +- *(1) Project [named_struct(id, id#90L, id2, id2#91L) AS _2#95]
> +- *(1) Filter (named_struct(id, id#90L, id2, id2#91L).id > 30)
> +- *(1) FileScan parquet [id#90L,id2#91L] Batched: true, Format: Parquet, Location:
InMemoryFileIndex[file:/Users/henry/src/spark/one_million], PartitionFilters: [], PushedFilters:
[], ReadSchema: struct<id:bigint,id2:bigint>
> {code}
> Using {{joinWith}} means that the filter is placed on a {{named_struct}}, and is then
pushed down. When the filter is just above the scan, the wrapping-and-projection of {{named_struct(id...).id}}
is a no-op and could be removed. Then the filter can be pushed down to Parquet.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)
---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org
|