spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Roshan Nair <ros...@indix.com>
Subject Re: Execution blocked when collect()ing some relatively big blocks on spark 0.9
Date Wed, 19 Feb 2014 17:14:16 GMT
Hi Guillaume,

We migrated on Monday as well, and something very similar seems to happen
to us.

In my case, the driver logs and the executor logs are in the same state,
except that the tasks the executor reports as finished are from the
previous stage. It doesn't seem to have received the new task at all. The
ui says this happened in a reduceByKeyLocally. The reduceByKeyLocally is
preceded by a broadcast and a mapPartitions. All three show up as a single
stage - reduceByKeyLocally.

Yesterday, I was able to get past this, by increasing the amount of memory
per worker from 4gb to 5gb, even though the RDD only takes about 600mb per
slave (according to the app ui). The exact same code (before recompiling to
scala-2.10 and spark-0.9) worked comfortably with 4gb per slave.

Roshan


On Wed, Feb 19, 2014 at 10:06 PM, Guillaume Pitel <
guillaume.pitel@exensa.com> wrote:

>  By the way, I've said the collect()ed blocks are relatively big because
> just before this collect() there are a few much smaller collectAsMap() and
> collect() that are not blocking. I'm really not sure it has something to do
> with the size, though, since I've previously done this with much bigger
> blocks, 20Mb is not supposed to block anything.
>
> Guillaume
>
> Hi, we've switched from 0.8.1 to 0.9.0 on Monday, and we're facing a
> problems that does not seem to be obvious.
>
> Basically, we generate a random dense matrix (2M rows * 40 columns), split
> it in 20, collect() it and then broadcast it.
>
> The generation is ok, but then the workers send the blocks, and nothing
> happens. Spark is locked forever in this state. Here is what happens on the
> driver :
>
> 14/02/19 16:21:49 INFO TaskSetManager: Serialized task 12.0:13 as 2083
> bytes in 0 ms
> 14/02/19 16:21:49 INFO TaskSetManager: Starting task 12.0:14 as TID 2294
> on executor 1: t4.exensa.loc (PROCESS_LOCAL)
> 14/02/19 16:21:49 INFO TaskSetManager: Serialized task 12.0:14 as 2083
> bytes in 0 ms
> 14/02/19 16:21:49 INFO TaskSetManager: Starting task 12.0:15 as TID 2295
> on executor 4: t3.exensa.loc (PROCESS_LOCAL)
> 14/02/19 16:21:49 INFO TaskSetManager: Serialized task 12.0:15 as 2083
> bytes in 1 ms
> 14/02/19 16:21:49 INFO TaskSetManager: Starting task 12.0:16 as TID 2296
> on executor 0: t0.exensa.loc (PROCESS_LOCAL)
> 14/02/19 16:21:49 INFO TaskSetManager: Serialized task 12.0:16 as 2083
> bytes in 2 ms
> 14/02/19 16:21:49 INFO TaskSetManager: Starting task 12.0:17 as TID 2297
> on executor 3: t1.exensa.loc (PROCESS_LOCAL)
> 14/02/19 16:21:49 INFO TaskSetManager: Serialized task 12.0:17 as 2083
> bytes in 1 ms
> 14/02/19 16:21:49 INFO TaskSetManager: Starting task 12.0:18 as TID 2298
> on executor 2: t5.exensa.loc (PROCESS_LOCAL)
> 14/02/19 16:21:49 INFO TaskSetManager: Serialized task 12.0:18 as 2083
> bytes in 1 ms
> 14/02/19 16:21:49 INFO TaskSetManager: Starting task 12.0:19 as TID 2299
> on executor 5: t6.exensa.loc (PROCESS_LOCAL)
> 14/02/19 16:21:49 INFO TaskSetManager: Serialized task 12.0:19 as 2083
> bytes in 1 ms
>
>
> And on an executor :
>
> 14/02/19 15:21:53 INFO Executor: Serialized size of result for 2287 is
> 17229427
> 14/02/19 15:21:53 INFO Executor: Sending result for 2287 directly to driver
> 14/02/19 15:21:53 INFO Executor: Serialized size of result for 2299 is
> 17229262
> 14/02/19 15:21:53 INFO Executor: Sending result for 2299 directly to driver
> 14/02/19 15:21:53 INFO Executor: Finished task ID 2299
> 14/02/19 15:21:53 INFO Executor: Finished task ID 2287
> 14/02/19 15:21:53 INFO Executor: Serialized size of result for 2281 is
> 17229426
> 14/02/19 15:21:53 INFO Executor: Sending result for 2281 directly to driver
> 14/02/19 15:21:53 INFO Executor: Finished task ID 2281
>
>
> And.... that's all.  The driver does not receive the information that the
> task is finished. I have a akka.frameSize=512 and a kryo buffer on 512mb
>
> DEBUG level does not add anything (at least executor side, I didn't try on
> driver).
>
> The RDD which is collected is made of (Int, (Array[Int],FloatMatrix))
>
> Any help would be greatly appreciated.
>
> Thanks
> Guillaume
> --
>    [image: eXenSa]
>  *Guillaume PITEL, Président*
> +33(0)6 25 48 86 80
>
> eXenSa S.A.S. <http://www.exensa.com/>
>  41, rue Périer - 92120 Montrouge - FRANCE
> Tel +33(0)1 84 16 36 77 / Fax +33(0)9 72 28 37 05
>
>
>
> --
>    [image: eXenSa]
>  *Guillaume PITEL, Président*
> +33(0)6 25 48 86 80
>
> eXenSa S.A.S. <http://www.exensa.com/>
>  41, rue Périer - 92120 Montrouge - FRANCE
> Tel +33(0)1 84 16 36 77 / Fax +33(0)9 72 28 37 05
>

Mime
View raw message