spark-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Guillaume Pitel <guillaume.pi...@exensa.com>
Subject Re: Decrease shuffle in TreeAggregate with coalesce ?
Date Thu, 28 Apr 2016 09:23:22 GMT
Le 27/04/2016 à 19:41, Joseph Bradley a écrit :
> Do you have code which can reproduce this performance drop in 
> treeReduce?  It would be helpful to debug.  In the 1.6 release, we 
> profiled it via the various MLlib algorithms and did not see 
> performance drops.
That would be difficult, but if we cannot find out, we'll design a small 
example to test that. I first have to check with latest git version. I 
have to recompile spark with lgpl version of netlib.

> It's not just renumbering the partitions; it is reducing the number of 
> partitions by a factor of 1.0/scale (where scale > 1).  This creates a 
> "tree"-structured aggregation so that more of the work of merging 
> during aggregation is done on the workers, not the driver.
>
Sure,I get that, and it wasn't my point. I just think coalesce also 
reduces the number of partitions, without shuffle, right ?

_With Coalesce :_
Let's say we have 2 workers with 2 partitions each.

W0: p0,p1
W1: p1,p2

Since coalesce tries to reduce shuffling, coalesce(2) should group 
contents of p0 and p1 in p0' (on W0) and p2 and p3 in p1' (-on W1)

OTOH, _with current mapPartitionWithIndex + modulo + reduceByKey_, let's 
say partitions are numbered like that :

(0,p0),(1,p1),(2,p2),(3,p3)

Then after the modulo, (0,p0),(1,p1),(0,p2),(1,p3)

As a consequence, W1 will shuffle p2 to W0 and W0 will shuffle p1 to W1.

Guillaume

> On Wed, Apr 27, 2016 at 4:46 AM, Guillaume Pitel 
> <guillaume.pitel@exensa.com <mailto:guillaume.pitel@exensa.com>> wrote:
>
>     Hi,
>
>     I've been looking at the code of RDD.treeAggregate, because we've
>     seen a huge performance drop between 1.5.2 and 1.6.1 on a
>     treeReduce. I think the treeAggregate code hasn't changed, so my
>     message is not about the performance drop, but a more general
>     remark about treeAggregate.
>
>     In treeAggregate, after the aggregate is applied inside original
>     partitions, we enter the tree :
>
>
>     	while (numPartitions > scale + math.ceil(numPartitions.toDouble /
>     scale)) {
>
>     	numPartitions /= scale
>
>     	val curNumPartitions = numPartitions
>
>     	*partiallyAggregated
>     **=**partiallyAggregated.mapPartitionsWithIndex {*
>
>     	*(i, iter) **=>**iter.map((i **%**curNumPartitions, _))*
>
>     	}.reduceByKey(new HashPartitioner(curNumPartitions),
>     cleanCombOp).values
>
>     	}
>
>
>     The two lines where the partitions are numbered then renumbered,
>     then reducedByKey seems below optimality to me. There is a huge
>     shuffle cost, while a simple coalesce followed by a
>     partition-level aggregation would probably perfectly do the job.
>
>     Have I missed something that requires to do this reshuffle ?
>
>     Best regards
>     Guillaume Pitel
>
>


-- 
eXenSa

	
*Guillaume PITEL, Président*
+33(0)626 222 431

eXenSa S.A.S. <http://www.exensa.com/>
41, rue Périer - 92120 Montrouge - FRANCE
Tel +33(0)184 163 677 / Fax +33(0)972 283 705


Mime
View raw message