spark-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Evan Sparks <evan.spa...@gmail.com>
Subject Re: Gradient Descent with large model size
Date Sat, 17 Oct 2015 21:23:49 GMT
Yes, remember that your bandwidth is the maximum number of bytes per second that can be shipped
to the driver. So if you've got 5 blocks that size, then it looks like you're basically saturating
the network. 

Aggregation trees help for many partitions/nodes and butterfly mixing can help use all of
the network resources. I have seen implementations of butterfly mixing in spark but don't
know if we've got one in mainline. Zhao and Canny's work [1] details this approach in the
context of model fitting. 

At any rate, for this type of ANN work with huge models in *any* distributed setting, you're
going to need to get faster networking (most production deployments I know of either have
10 gigabit Ethernet or 40 gigabit infiniband links), or figure out a way to decrease frequency
or density of updates. Both would be even better. 

[1] http://www.cs.berkeley.edu/~jfc/papers/13/butterflymixing.pdf

> On Oct 17, 2015, at 12:47 PM, Joseph Bradley <joseph@databricks.com> wrote:
> 
> The decrease in running time from N=6 to N=7 makes some sense to me; that should be when
tree aggregation kicks in.  I'd call it an improvement to run in the same ~13sec increasing
from N=6 to N=9.
> 
> "Does this mean that for 5 nodes with treeaggreate of depth 1 it will take 5*3.1~15.5
seconds?"
> --> I would guess so since all of that will be aggregated on the driver, but I don't
know enough about Spark's shuffling/networking to say for sure.  Others may be able to help
more.
> 
> Your numbers do make me wonder if we should examine the structure of the tree aggregation
more carefully and see if we can improve it.  https://issues.apache.org/jira/browse/SPARK-11168
> 
> Joseph
> 
>> On Thu, Oct 15, 2015 at 7:01 PM, Ulanov, Alexander <alexander.ulanov@hpe.com>
wrote:
>> Hi Joseph,
>> 
>>  
>> 
>> There seems to be no improvement if I run it with more partitions or bigger depth:
>> 
>> N = 6 Avg time: 13.491579108666668
>> 
>> N = 7 Avg time: 8.929480508
>> 
>> N = 8 Avg time: 14.507123471999998
>> 
>> N= 9 Avg time: 13.854871645333333
>> 
>>  
>> 
>> Depth = 3
>> 
>> N=2 Avg time: 8.853895346333333
>> 
>> N=5 Avg time: 15.991574924666667
>> 
>>  
>> 
>> I also measured the bandwidth of my network with iperf. It shows 247Mbit/s. So the
transfer of 12M array of double message should take 64 * 12M/247M~3.1s. Does this mean that
for 5 nodes with treeaggreate of depth 1 it will take 5*3.1~15.5 seconds?
>> 
>>  
>> 
>> Best regards, Alexander
>> 
>> From: Joseph Bradley [mailto:joseph@databricks.com] 
>> Sent: Wednesday, October 14, 2015 11:35 PM
>> To: Ulanov, Alexander
>> Cc: dev@spark.apache.org
>> Subject: Re: Gradient Descent with large model size
>> 
>>  
>> 
>> For those numbers of partitions, I don't think you'll actually use tree aggregation.
 The number of partitions needs to be over a certain threshold (>= 7) before treeAggregate
really operates on a tree structure:
>> 
>> https://github.com/apache/spark/blob/9808052b5adfed7dafd6c1b3971b998e45b2799a/core/src/main/scala/org/apache/spark/rdd/RDD.scala#L1100
>> 
>>  
>> 
>> Do you see a slower increase in running time with more partitions?  For 5 partitions,
do you find things improve if you tell treeAggregate to use depth > 2?
>> 
>>  
>> 
>> Joseph
>> 
>>  
>> 
>> On Wed, Oct 14, 2015 at 1:18 PM, Ulanov, Alexander <alexander.ulanov@hpe.com>
wrote:
>> 
>> Dear Spark developers,
>> 
>>  
>> 
>> I have noticed that Gradient Descent is Spark MLlib takes long time if the model
is large. It is implemented with TreeAggregate. I’ve extracted the code from GradientDescent.scala
to perform the benchmark. It allocates the Array of a given size and the aggregates it:
>> 
>>  
>> 
>> val dataSize = 12000000
>> 
>> val n = 5
>> 
>> val maxIterations = 3
>> 
>> val rdd = sc.parallelize(0 until n, n).cache()
>> 
>> rdd.count()
>> 
>> var avgTime = 0.0
>> 
>> for (i <- 1 to maxIterations) {
>> 
>>   val start = System.nanoTime()
>> 
>>   val result = rdd.treeAggregate((new Array[Double](dataSize), 0.0, 0L))(
>> 
>>         seqOp = (c, v) => {
>> 
>>           // c: (grad, loss, count)
>> 
>>           val l = 0.0
>> 
>>           (c._1, c._2 + l, c._3 + 1)
>> 
>>         },
>> 
>>         combOp = (c1, c2) => {
>> 
>>           // c: (grad, loss, count)
>> 
>>           (c1._1, c1._2 + c2._2, c1._3 + c2._3)
>> 
>>         })
>> 
>>   avgTime += (System.nanoTime() - start) / 1e9
>> 
>>   assert(result._1.length == dataSize)
>> 
>> }
>> 
>> println("Avg time: " + avgTime / maxIterations)
>> 
>>  
>> 
>> If I run on my cluster of 1 master and 5 workers, I get the following results (given
the array size = 12M):
>> 
>> n = 1: Avg time: 4.555709667333333
>> 
>> n = 2 Avg time: 7.059724584666667
>> 
>> n = 3 Avg time: 9.937117377666667
>> 
>> n = 4 Avg time: 12.687526233
>> 
>> n = 5 Avg time: 12.939526129666667
>> 
>>  
>> 
>> Could you explain why the time becomes so big? The data transfer of 12M array of
double should take ~ 1 second in 1Gbit network. There might be other overheads, however not
that big as I observe.
>> 
>> Best regards, Alexander
>> 
> 

Mime
View raw message