spark-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Evan R. Sparks" <>
Subject Re: RDD API patterns
Date Sat, 26 Sep 2015 17:07:39 GMT

I believe the reason you're seeing near identical performance on the
gradient computations is twofold
1) Gradient computations for GLM models are computationally pretty cheap
from a FLOPs/byte read perspective. They are essentially a BLAS "gemv" call
in the dense case, which is well known to be bound by memory bandwidth on
modern processors. So, you're basically paying the cost of a scan of the
points you've sampled to do the gradient computation.
2) The default sampling mechanism used by the GradientDescent optimizer in
MLlib is implemented via RDD.sample, which does reservoir sampling on each
partition. This requires a full scan of each partition at every iteration
to collect the samples.

So - you're going to pay the cost of a scan to do the sampling anyway, and
the gradient computation is essentially free at this point (and can be
pipelined, etc.).

It is quite possible to improve #2 by coming up with a better sampling
algorithm. One easy algorithm would be to assume the data is already
randomly shuffled (or do that once) and then use the first
miniBatchFraction*partitionSize records on the first iteration, the second
set on the second set on the second iteration, and so on. You could
protoype this algorithm pretty easily by converting your data to an
RDD[Array[DenseVector]] and doing some bookkeeping at each iteration.

That said - eventually the overheads of the platform catch up to you. As a
rule of thumb I estimate about 50ms/iteration as a floor for things like
task serialization and other platform overheads. You've got to balance how
much computation you want to do vs. the amount of time you want to spend
waiting for the platform.

- Evan

On Sat, Sep 26, 2015 at 9:27 AM, Mike Hynes <> wrote:

> Hello Devs,
> This email concerns some timing results for a treeAggregate in
> computing a (stochastic) gradient over an RDD of labelled points, as
> is currently done in the MLlib optimization routine for SGD.
> In SGD, the underlying RDD is downsampled by a fraction f \in (0,1],
> and the subgradients over all the instances in the downsampled RDD are
> aggregated to the driver as a dense vector. However, we have noticed
> some unusual behaviour when f < 1: it takes the same amount of time to
> compute the stochastic gradient for a stochastic minibatch as it does
> for a full batch (f = 1).
> Attached are two plots of the mean task timing metrics for each level
> in the aggregation, which has been performed with 4 levels (level 4 is
> the final level, in which the results are communicated to the driver).
> 16 nodes are used, and the RDD has 256 partitions. We run in (client)
> standalone mode. Here, the total time for the tasks is shown (\tau)
> alongside the execution time (not counting GC),
> serialization/deserialization time, the GC time, and the difference
> between tau and all other times, assumed to be variable
> IO/communication/waiting time. The RDD in this case is a labelled
> point representation of the KDD Bridge to Algebra dataset, with 20M
> (sparse) instances and a problem dimension of 30M. The sparsity of the
> instances is very high---each individual instance vector may have only
> a hundred nonzeros. All metrics have been taken from the JSON Spark
> event logs.
> The plot gradient_f1.pdf shows the times for a gradient computation
> with f = 1, and gradient_f-3.pdf shows the same metrics with f = 1e-3.
> For other f values in {1e-1 1e-2 ... 1e-5}, the same effect is
> observed.
> What I would like to mention about these plots, and ask if anyone has
> experience with, is the following:
> 1. The times are essentially identical; I would have thought that
> downsampling the RDD before aggregating the subgradients would at
> least reduce the execution time required, if not the
> communication/serialization times.
> 2. The serialization time in level 4 is almost entirely from the
> result serialization to the driver, and not the task deserialization.
> In each level of the treeAggregation, however, the local (dense)
> gradients have to be communicated between compute nodes, so I am
> surprised that it takes so much longer to return the vectors to the
> driver.
> I initially wondered if the large IO overhead in the last stage had
> anything to do with client mode vs cluster mode, since, from what I
> understand, only a single core is allocated to the driver thread in
> client mode. However, when running tests in the two modes, I have
> previously seen no appreciable difference in the running time for
> other (admittedly smaller) problems. Furthermore, I am still very
> confused about why the execution time for each task is just as large
> for the downsampled RDD. It seems unlikely that sampling each
> partition would be as expensive as the gradient computations, even for
> sparse feature vectors.
> If anyone has experience working with the sampling in minibatch SGD or
> has tested the scalability of the treeAggregation operation for
> vectors, I'd really appreciate your thoughts.
> Thanks,
> Mike
> ---------------------------------------------------------------------
> To unsubscribe, e-mail:
> For additional commands, e-mail:

View raw message