spark-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Sean Owen <so...@cloudera.com>
Subject Re: Directly broadcasting (sort of) RDDs
Date Mon, 23 Mar 2015 12:05:38 GMT
Since RDDs aren't designed as random-access maps, and are basically
bits of bookkeeping that make sense only on the driver, I think the
realization of something like this in Spark would realistically be
"collect RDD to local data structure" if anything.

It sounds like you're looking for a distributed cache, and there are
frameworks for that that can be used with Spark without Spark
rebuilding that too.

On Mon, Mar 23, 2015 at 12:00 PM, Guillaume Pitel
<guillaume.pitel@exensa.com> wrote:
> Not far, but not exactly. The RDD could be too big to fit in memory,
>
> The idea is more like a worker-side rdd.lookup() with local cache.
>
> Guillaume
>
> In a sentence, is this the idea of collecting an RDD to memory on each
> executor directly?
>
> On Sun, Mar 22, 2015 at 10:56 PM, Sandy Ryza <sandy.ryza@cloudera.com>
> wrote:
>
> Hi Guillaume,
>
> I've long thought something like this would be useful - i.e. the ability to
> broadcast RDDs directly without first pulling data through the driver.  If I
> understand correctly, your requirement to "block" a matrix up and only fetch
> the needed parts could be implemented on top of this by splitting an RDD
> into a set of smaller RDDs and then broadcasting each one on its own.
>
> Unfortunately nobody is working on this currently (and I couldn't promise to
> have bandwidth to review it at the moment either), but I suspect we'll
> eventually need to add something like this for map joins in Hive on Spark
> and Spark SQL.
>
> -Sandy
>
>
>
> On Sat, Mar 21, 2015 at 3:11 AM, Guillaume Pitel
> <guillaume.pitel@exensa.com> wrote:
>
> Hi,
>
> Thanks for your answer. This is precisely the use case I'm interested in,
> but I know it already, I should have mentionned it. Unfortunately this
> implementation of BlockMatrix has (in my opinion) some disadvantages (the
> fact that it split the matrix by range instead of using a modulo is bad for
> block skewness). Besides, and more importantly, as I was writing, it uses
> the join solution (actually a cogroup :
> https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/BlockMatrix.scala,
> line 361). The reduplication of the elements of the dense matrix is thus
> dependent on the block size.
>
> Actually I'm wondering if what I want to achieve could be made with a
> simple modification to the join, allowing a partition to be weakly cached
> wafter being retrieved.
>
> Guillaume
>
>
> There is block matrix in Spark 1.3 -
> http://spark.apache.org/docs/latest/mllib-data-types.html#blockmatrix
>
>
>
>
>
> However I believe it only supports dense matrix blocks.
>
>
>
>
> Still, might be possible to use it or exetend
>
>
>
>
> JIRAs:
>
>
> https://issues.apache.org/jira/plugins/servlet/mobile#issue/SPARK-3434
>
>
>
>
>
> Was based on
>
>
> https://github.com/amplab/ml-matrix
>
>
>
>
>
> Another lib:
>
>
> https://github.com/PasaLab/marlin/blob/master/README.md
>
>
>
>
>
>
>
> —
> Sent from Mailbox
>
> On Sat, Mar 21, 2015 at 12:24 AM, Guillaume Pitel
> <guillaume.pitel@exensa.com> wrote:
>
> Hi,
> I have an idea that I would like to discuss with the Spark devs. The
> idea comes from a very real problem that I have struggled with since
> almost a year. My problem is very simple, it's a dense matrix * sparse
> matrix  operation. I have a dense matrix RDD[(Int,FloatMatrix)] which is
> divided in X large blocks (one block per partition), and a sparse matrix
> RDD[((Int,Int),Array[Array[(Int,Float)]]] , divided in X * Y blocks. The
> most efficient way to perform the operation is to collectAsMap() the
> dense matrix and broadcast it, then perform the block-local
> mutliplications, and combine the results by column.
> This is quite fine, unless the matrix is too big to fit in memory
> (especially since the multiplication is performed several times
> iteratively, and the broadcasts are not always cleaned from memory as I
> would naively expect).
> When the dense matrix is too big, a second solution is to split the big
> sparse matrix in several RDD, and do several broadcasts. Doing this
> creates quite a big overhead, but it mostly works, even though I often
> face some problems with unaccessible broadcast files, for instance.
> Then there is the terrible but apparently very effective good old join.
> Since X blocks of the sparse matrix use the same block from the dense
> matrix, I suspect that the dense matrix is somehow replicated X times
> (either on disk or in the network), which is the reason why the join
> takes so much time.
> After this bit of a context, here is my idea : would it be possible to
> somehow "broadcast" (or maybe more accurately, share or serve) a
> persisted RDD which is distributed on all workers, in a way that would,
> a bit like the IndexedRDD, allow a task to access a partition or an
> element of a partition in the closure, with a worker-local memory cache
> . i.e. the information about where each block resides would be
> distributed on the workers, to allow them to access parts of the RDD
> directly. I think that's already a bit how RDD are shuffled ?
> The RDD could stay distributed (no need to collect then broadcast), and
> only necessary transfers would be required.
> Is this a bad idea, is it already implemented somewhere (I would love it
> !) ?or is it something that could add efficiency not only for my use
> case, but maybe for others ? Could someone give me some hint about how I
> could add this possibility to Spark ? I would probably try to extend a
> RDD into a specific SharedIndexedRDD with a special lookup that would be
> allowed from tasks as a special case, and that would try to contact the
> blockManager and reach the corresponding data from the right worker.
> Thanks in advance for your advices
> Guillaume
> --
> eXenSa
> 	
> *Guillaume PITEL, Président*
> +33(0)626 222 431
> eXenSa S.A.S. <http://www.exensa.com/>
> 41, rue Périer - 92120 Montrouge - FRANCE
> Tel +33(0)184 163 677 / Fax +33(0)972 283 705
>
>
>
> --
> Guillaume PITEL, Président
> +33(0)626 222 431
>
> eXenSa S.A.S.
> 41, rue Périer - 92120 Montrouge - FRANCE
> Tel +33(0)184 163 677 / Fax +33(0)972 283 705
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: dev-unsubscribe@spark.apache.org
> For additional commands, e-mail: dev-help@spark.apache.org
>
>
>
> --
> Guillaume PITEL, Président
> +33(0)626 222 431
>
> eXenSa S.A.S.
> 41, rue Périer - 92120 Montrouge - FRANCE
> Tel +33(0)184 163 677 / Fax +33(0)972 283 705

---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@spark.apache.org
For additional commands, e-mail: dev-help@spark.apache.org


Mime
View raw message