spark-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Thomas Graves (JIRA)" <>
Subject [jira] [Commented] (SPARK-11316) coalesce doesn't handle UnionRDD with partial locality properly
Date Fri, 29 Apr 2016 15:38:12 GMT


Thomas Graves commented on SPARK-11316:

Simple steps to reproduce an RDD with partial preferred locations, Any text file will do here:

val textFile = sc.textFile("randomtext2.txt")
val textFile2 = sc.textFile("")
val wordCounts4 = textFile.flatMap(line => line.split(" ")).map(word => (word, 1)).reduceByKey((a,
b) => a + b)
val wordCounts5 = textFile2.flatMap(line => line.split(" ")).map(word => (word, 1)).reduceByKey((a,
b) => a + b)
val urdd = wordCounts4.union(wordCounts5)

> coalesce doesn't handle UnionRDD with partial locality properly
> ---------------------------------------------------------------
>                 Key: SPARK-11316
>                 URL:
>             Project: Spark
>          Issue Type: Bug
>          Components: Spark Core
>    Affects Versions: 1.5.1
>            Reporter: Thomas Graves
>            Assignee: Thomas Graves
>            Priority: Critical
> So I haven't fully debugged this yet but reporting what I'm seeing and think might be
going on.
> I have a graph processing job that is seeing huge slow down in setupGroups in the location
iterator where its getting the preferred locations for the coalesce.  They are coalescing
from 2400 down to 1200 and its taking 17+ hours to do the calculation.  Killed it at this
point so don't know total time.
> It appears that the job is doing an isEmpty call, a bunch of other transformation, then
a coalesce (where it takes so long), other transformations, then finally a count to trigger
> It appears that there is only one node that its finding in the setupGroup call and to
get to that node it has to first to through the while loop:
>     while (numCreated < targetLen && tries < expectedCoupons2) {
> where expectedCoupons2 is around 19000.  It finds very few or none in this loop.  
> Then it does the second loop:
> while (numCreated < targetLen) {  // if we don't have enough partition groups, create
>       var (nxt_replica, nxt_part) =
>       val pgroup = PartitionGroup(nxt_replica)
>       groupArr += pgroup
>       groupHash.getOrElseUpdate(nxt_replica, ArrayBuffer()) += pgroup
>       var tries = 0
>       while (!addPartToPGroup(nxt_part, pgroup) && tries < targetLen) { //
ensure at least one part
>         nxt_part =
>         tries += 1
>       }
>       numCreated += 1
>     }
> Where it has an inner while loop and both of those are going 1200 times.  1200*1200 loops.
 This is taking a very long time.
> The user can work around the issue by adding in a count() call very close to after the
isEmpty call before the coalesce is called.  I also tried putting in a take(10000)  right
before the isEmpty call and it seems to work around the issue, took 1 hours with the take
vs a few minutes with the count().

This message was sent by Atlassian JIRA

To unsubscribe, e-mail:
For additional commands, e-mail:

View raw message