spark-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Reynold Xin (JIRA)" <j...@apache.org>
Subject [jira] [Comment Edited] (SPARK-18367) DataFrame join spawns unreasonably high number of open files
Date Fri, 11 Nov 2016 05:46:58 GMT

    [ https://issues.apache.org/jira/browse/SPARK-18367?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15656234#comment-15656234
] 

Reynold Xin edited comment on SPARK-18367 at 11/11/16 5:46 AM:
---------------------------------------------------------------

Actually try set "spark.shuffle.sort.bypassMergeThreshold" to a smaller number (say 10). The
max number of files Spark will create without you explicitly changing the number of partitions
is bypassMergeThreshold * N, where N is the number of partitions pre-shuffle. bypassMergeThreshold
default is 200.

The reason you didn't see it with a small number of distinct keys is because there is a short
cut to not create a file if there is no record matching that hash bucket.

This is working as expected.



was (Author: rxin):
Actually try set "spark.shuffle.sort.bypassMergeThreshold" to a smaller number (say 10). The
max number of files Spark will create without you explicitly changing the number of partitions
is bypassMergeThreshold * N, where N is the number of partitions pre-shuffle. bypassMergeThreshold
default is 200.




> DataFrame join spawns unreasonably high number of open files
> ------------------------------------------------------------
>
>                 Key: SPARK-18367
>                 URL: https://issues.apache.org/jira/browse/SPARK-18367
>             Project: Spark
>          Issue Type: Bug
>          Components: SQL
>    Affects Versions: 2.0.1, 2.1.0
>         Environment: Python 3.5, Java 8
>            Reporter: Nicholas Chammas
>         Attachments: spark-lsof.txt
>
>
> I have a moderately complex DataFrame query that spawns north of 10K open files, causing
my job to crash. 10K is the macOS limit on how many files a single process can have open at
once. It seems unreasonable that Spark should hold that many files open at once.
> I was able to boil down what I'm seeing to the following minimal reproduction:
> {code}
> import pyspark
> from pyspark.sql import Row
> if __name__ == '__main__':
>     spark = pyspark.sql.SparkSession.builder.getOrCreate()
>     df = spark.createDataFrame([
>         Row(a=n)
>         for n in range(500000)
>     ])  #.coalesce(1)  # a coalesce(1) here "fixes" the problem
>     df = df.join(df, on='a')  #.coalesce(1)  # a coalesce(1) here doesn't help
>     print('partitions:', df.rdd.getNumPartitions())
>     df.explain()
>     df.show(1)
> {code}
> When I run this code, Spark spawns over 2K open files. I can "fix" the problem by adding
a {{coalesce(1)}} in the right place, as indicated in the comments above. When I do, Spark
spawns no more than 600 open files. The number of partitions without the coalesce is 200.
> Here are the execution plans with and without the coalesce.
> 2K+ open files, without the coalesce:
> {code}
> == Physical Plan ==
> *Project [a#0L]
> +- *SortMergeJoin [a#0L], [a#3L], Inner
>    :- *Sort [a#0L ASC NULLS FIRST], false, 0
>    :  +- Exchange hashpartitioning(a#0L, 200)
>    :     +- *Filter isnotnull(a#0L)
>    :        +- Scan ExistingRDD[a#0L]
>    +- *Sort [a#3L ASC NULLS FIRST], false, 0
>       +- Exchange hashpartitioning(a#3L, 200)
>          +- *Filter isnotnull(a#3L)
>             +- Scan ExistingRDD[a#3L]
> {code}
> <600 open files, with the coalesce:
> {code}
> == Physical Plan ==
> *Project [a#0L]
> +- *SortMergeJoin [a#0L], [a#4L], Inner
>    :- *Sort [a#0L ASC NULLS FIRST], false, 0
>    :  +- Coalesce 1
>    :     +- *Filter isnotnull(a#0L)
>    :        +- Scan ExistingRDD[a#0L]
>    +- *Sort [a#4L ASC NULLS FIRST], false, 0
>       +- Coalesce 1
>          +- *Filter isnotnull(a#4L)
>             +- Scan ExistingRDD[a#4L]
> {code}
> So the key difference appears to be the {{Exchange hashpartitioning(a#0L, 200)}} operator.
> Is the large number of open files perhaps expected given the join on a large number of
distinct keys? If so, how would one mitigate that issue? If not, is this a bug in Spark?



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org


Mime
View raw message