spark-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Paul Lysak (JIRA)" <>
Subject [jira] [Commented] (SPARK-19371) Cannot spread cached partitions evenly across executors
Date Fri, 03 Mar 2017 07:54:45 GMT


Paul Lysak commented on SPARK-19371:

I'm observing similar behavior in Spark 2.1 - unfortunately, due to complex workflow of our
application wasn't yet able to identify after which operation exactly all the partitions of
DataFrame end up on a single executor, so no matter how big cluster is - only one executor
picks all the job.

> Cannot spread cached partitions evenly across executors
> -------------------------------------------------------
>                 Key: SPARK-19371
>                 URL:
>             Project: Spark
>          Issue Type: Bug
>    Affects Versions: 1.6.1
>            Reporter: Thunder Stumpges
> Before running an intensive iterative job (in this case a distributed topic model training),
we need to load a dataset and persist it across executors. 
> After loading from HDFS and persisting, the partitions are spread unevenly across executors
(based on the initial scheduling of the reads which are not data locale sensitive). The partition
sizes are even, just not their distribution over executors. We currently have no way to force
the partitions to spread evenly, and as the iterative algorithm begins, tasks are distributed
to executors based on this initial load, forcing some very unbalanced work.
> This has been mentioned a [number|]
of [times|]
in [various|]
user/dev group threads.
> None of the discussions I could find had solutions that worked for me. Here are examples
of things I have tried. All resulted in partitions in memory that were NOT evenly distributed
to executors, causing future tasks to be imbalanced across executors as well.
> *Reduce Locality*
> {code}spark.shuffle.reduceLocality.enabled=false/true{code}
> *"Legacy" memory mode*
> {code}spark.memory.useLegacyMode = true/false{code}
> *Basic load and repartition*
> {code}
> val numPartitions = 48*16
> val df =
>     parquet("/data/folder_to_load").
>     repartition(numPartitions).
>     persist
> df.count
> {code}
> *Load and repartition to 2x partitions, then shuffle repartition down to desired partitions*
> {code}
> val numPartitions = 48*16
> val df2 =
>     parquet("/data/folder_to_load").
>     repartition(numPartitions*2)
> val df = df2.repartition(numPartitions).
>     persist
> df.count
> {code}
> It would be great if when persisting an RDD/DataFrame, if we could request that those
partitions be stored evenly across executors in preparation for future tasks. 
> I'm not sure if this is a more general issue (I.E. not just involving persisting RDDs),
but for the persisted in-memory case, it can make a HUGE difference in the over-all running
time of the remaining work.

This message was sent by Atlassian JIRA

To unsubscribe, e-mail:
For additional commands, e-mail:

View raw message