spark-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Sean Owen (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (SPARK-9096) Unevenly distributed task loads after using JavaRDD.subtract()
Date Thu, 16 Jul 2015 19:37:05 GMT

    [ https://issues.apache.org/jira/browse/SPARK-9096?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14630219#comment-14630219
] 

Sean Owen commented on SPARK-9096:
----------------------------------

Pretty sure I see the reason -- [~mengxr] this may be of interest.

The issue is that Vector hash codes changed in 1.4, so that they only sample the first 16
elements. These are quite sparse vectors, so many more vectors now have the same hash code
because they have the same first 16 elements (presumably 0s). I verified that in 1.3.1 the
training set here has about 183K distinct vectors and 183K distinct hash codes, but in 1.4.1,
there are still 183K distinct vectors but about 2600 distinct hash codes. Since there are
1600 partitions, the distribution isn't very even at all when any shuffle-related operation
kicks in.

So, in a sense this is on purpose, since the hash code change had a number of other upsides
in speed. The result is still correct, but the partitioning is just not very even. So this
is a special case of a more general issue: unbalanced data.

One solution is to subclass HashPartitioner and substitute a different hash code for Vector
that is exactly the one in Spark but without the restriction to first 16 elements. It's easier
than it sounds.

Maybe simpler still would be to transform the LabeledPoint to tuples of (LabeledPoint, hashCode)
where hashCode is again an operation that looks at the whole vector. Then subtractByKey, then
take just the keys (that's what subtract does underneath anyway). No custom partitioner there.

[~mengxr] are there better options?

In your very specific use case though -- if you are making a test/train split, use randomSplit()
instead to get both RDDs at once. It's faster than using sample and subtract anyway.

> Unevenly distributed task loads after using JavaRDD.subtract()
> --------------------------------------------------------------
>
>                 Key: SPARK-9096
>                 URL: https://issues.apache.org/jira/browse/SPARK-9096
>             Project: Spark
>          Issue Type: Bug
>          Components: Spark Core
>    Affects Versions: 1.4.0, 1.4.1
>            Reporter: Gisle Ytrestøl
>            Priority: Minor
>         Attachments: ReproduceBug.java, hanging-one-task.jpg, reproduce.1.3.1.log.gz,
reproduce.1.4.1.log.gz
>
>
> When using JavaRDD.subtract(), it seems that the tasks are unevenly distributed in the
the following operations on the new JavaRDD which is created by "subtract". The result is
that in the following operation on the new JavaRDD, a few tasks process almost all the data,
and these tasks will take a long time to finish. 
> I've reproduced this bug in the attached Java file, which I submit with spark-submit.

> The logs for 1.3.1 and 1.4.1 are attached. In 1.4.1, we see that a few tasks in the count
job takes a lot of time:
> 15/07/16 09:13:17 INFO TaskSetManager: Finished task 1459.0 in stage 2.0 (TID 4659) in
708 ms on 148.251.190.217 (1597/1600)
> 15/07/16 09:13:17 INFO TaskSetManager: Finished task 1586.0 in stage 2.0 (TID 4786) in
772 ms on 148.251.190.217 (1598/1600)
> 15/07/16 09:17:51 INFO TaskSetManager: Finished task 1382.0 in stage 2.0 (TID 4582) in
275019 ms on 148.251.190.217 (1599/1600)
> 15/07/16 09:20:02 INFO TaskSetManager: Finished task 1230.0 in stage 2.0 (TID 4430) in
407020 ms on 148.251.190.217 (1600/1600)
> 15/07/16 09:20:02 INFO TaskSchedulerImpl: Removed TaskSet 2.0, whose tasks have all completed,
from pool 
> 15/07/16 09:20:02 INFO DAGScheduler: ResultStage 2 (count at ReproduceBug.java:56) finished
in 420.024 s
> 15/07/16 09:20:02 INFO DAGScheduler: Job 0 finished: count at ReproduceBug.java:56, took
442.941395 s
> In comparison, all tasks are more or less equal in size when running the same application
in Spark 1.3.1. In overall, this
> attached application (ReproduceBug.java) takes about 7 minutes on Spark 1.4.1, and completes
in roughly 30 seconds in Spark 1.3.1. 
> Spark 1.4.0 behaves similar to Spark 1.4.1 wrt this issue.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org


Mime
View raw message