spark-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Daniel Darabos (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (SPARK-1239) Don't fetch all map output statuses at each reducer during shuffles
Date Tue, 03 Nov 2015 10:38:28 GMT

    [ https://issues.apache.org/jira/browse/SPARK-1239?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14987046#comment-14987046
] 

Daniel Darabos commented on SPARK-1239:
---------------------------------------

I can also add some data. I have a ShuffleMapStage with 82,714 tasks and then a ResultStage
with 222,609 tasks. The driver cannot serialize this:

{noformat}
java.lang.OutOfMemoryError: Requested array size exceeds VM limit
        at java.util.Arrays.copyOf(Arrays.java:2271) ~[na:1.7.0_79]
        at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:113) ~[na:1.7.0_79]
        at java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93) ~[na:1.7.0_79]
        at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:140) ~[na:1.7.0_79]
        at java.util.zip.DeflaterOutputStream.deflate(DeflaterOutputStream.java:253) ~[na:1.7.0_79]
        at java.util.zip.DeflaterOutputStream.write(DeflaterOutputStream.java:211) ~[na:1.7.0_79]
        at java.util.zip.GZIPOutputStream.write(GZIPOutputStream.java:146) ~[na:1.7.0_79]
        at java.io.ObjectOutputStream$BlockDataOutputStream.writeBlockHeader(ObjectOutputStream.java:1893)
~[na:1.7.0_79]
        at java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1874)
~[na:1.7.0_79]
        at java.io.ObjectOutputStream$BlockDataOutputStream.flush(ObjectOutputStream.java:1821)
~[na:1.7.0_79]
        at java.io.ObjectOutputStream.flush(ObjectOutputStream.java:718) ~[na:1.7.0_79]
        at java.io.ObjectOutputStream.close(ObjectOutputStream.java:739) ~[na:1.7.0_79]
        at org.apache.spark.MapOutputTracker$$anonfun$serializeMapStatuses$2.apply$mcV$sp(MapOutputTracker.scala:362)
~[spark-assembly-1.4.0-hadoop2.4.0.jar:1.4.0]
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1294) ~[spark-assembly-1.4.0-hadoop2.4.0.jar:1.4.0]
        at org.apache.spark.MapOutputTracker$.serializeMapStatuses(MapOutputTracker.scala:361)
~[spark-assembly-1.4.0-hadoop2.4.0.jar:1.4.0]
        at org.apache.spark.MapOutputTrackerMaster.getSerializedMapOutputStatuses(MapOutputTracker.scala:312)
~[spark-assembly-1.4.0-hadoop2.4.0.jar:1.4.0]
        at org.apache.spark.MapOutputTrackerMasterEndpoint$$anonfun$receiveAndReply$1.applyOrElse(MapOutputTracker.scala:49)
~[spark-assembly-1.4.0-hadoop2.4.0.jar:1.4.0]
{noformat}

I see {{getSerializedMapOutputStatuses}} has changed a lot since 1.4.0 but it still returns
an array sized proportional to _M * R_. How can this be part of a scalable system? How is
this not a major issue for everyone? Am I doing something wrong?

I'm now thinking that maybe if you have an overwhelming majority of empty or non-empty blocks,
the bitmap will compress very well. But it's possible that I am ending up with a relatively
even mix of empty and non-empty blocks, killing the compression. I have about 40 billion lines,
_M * R_ is about 20 billion, so this seems plausible.

It's also possible that I should have larger partitions. Due to the processing I do it's not
possible -- it leads to the executors OOMing. But larger partitions would not be a scalable
solution anyway. If _M_ and _R_ are reasonable now with some number of lines per partition,
then when your data size doubles they will also double and _M * R_ will quadruple. At some
point the number of lines per map output will be low enough that compression becomes ineffective.

I see https://issues.apache.org/jira/browse/SPARK-11271 has recently decreased the map status
size by 20%. That means in Spark 1.6 I will be able to process 1/sqrt(0.8) or 12% more data
than now. The way I understand the situation the improvement required is orders of magnitude
larger than that. I'm currently hitting this issue with 5 TB of input. If I tried processing
5 PB, the map status would be a million times larger.

I like the premise of this JIRA ticket of not building the map status table in the first place.
But a colleague of mine asks if perhaps we could even avoid tracking this data in the driver.
If the driver just provided the reducers with the list of mappers they could each just ask
the mappers directly for the list of blocks they should fetch.

> Don't fetch all map output statuses at each reducer during shuffles
> -------------------------------------------------------------------
>
>                 Key: SPARK-1239
>                 URL: https://issues.apache.org/jira/browse/SPARK-1239
>             Project: Spark
>          Issue Type: Improvement
>          Components: Shuffle, Spark Core
>    Affects Versions: 1.0.2, 1.1.0
>            Reporter: Patrick Wendell
>
> Instead we should modify the way we fetch map output statuses to take both a mapper and
a reducer - or we should just piggyback the statuses on each task. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org


Mime
View raw message