spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From madhu phatak <phatak....@gmail.com>
Subject Re: Forcing RDD computation with something else than count() ?
Date Tue, 21 Jan 2014 12:02:09 GMT
Hi,
You can call less expensive operations like first or  take to trigger the
computation.


On Tue, Jan 21, 2014 at 2:32 PM, Guillaume Pitel <guillaume.pitel@exensa.com
> wrote:

>  Hi,
>
> I'm struggling a bit with something : I have several datasets
> RDD[((Int,Int),Double)] that I want to merge.
>
> I've tried with union+reduceByKey and cogroup+mapValues, but in all cases
> it seems that if I don't force the computation of the RDD, the final task
> fails, probably because the dataset is too big.
>
> Currently I use count() and persist() to force the computation, but I
> suspect there is a useless overhead when doing so. Is there any other way
> to force the computation ?
>
> Or any advice on that kind of matter ? Numbers : 10 datasets of 200M-400M
> elements, when merged, 2B elements.
>
> My code below :
>
>   def reduceCooccurrences(datasets:List[RDD[(Cooccurrence,Double)]]): RDD[(Cooccurrence,
Double)] = {
>
>     println("Reducing a list of " + datasets.length)
>
>     val result = if (datasets.length == 1)
>
>       datasets(0)
>
>     else if (datasets.length == 2) {
>
>       datasets.map{ rdd => println("- RDD of " + rdd.count() + " elements")}
>
>       val r = datasets(0).cogroup(datasets(1)).mapValues { case (sda,sdb) =>
>
>         sda.sum + sdb.sum
>
>       }
>
>       datasets.map(_.unpersist())
>
>       r
>
>     }
>
>     else if (datasets.length == 3) {
>
>       datasets.map{ rdd => println("- RDD of " + rdd.count() + " elements")}
>
>       val r = datasets(0).cogroup(datasets(1), datasets(2)).mapValues { case (sda,sdb,sdc)
=>
>
>         sda.sum + sdb.sum + sdc.sum
>
>       }
>
>       datasets.map(_.unpersist())
>
>       r
>
>     } else {
>
>       val (b,e) = datasets.splitAt(datasets.length / 2)
>
>       reduceCooccurrences(b).cogroup(reduceCooccurrences(e)).mapValues { case (sda,sdb)
=>
>
>         sda.sum + sdb.sum
>
>       }
>
>     }
>
>     result.persist(StorageLevel.MEMORY_AND_DISK_SER)
>
>     println("Total elements " + result.count())
>
>     result
>
>   }
>
>   def mergeCooccurrences(sc: SparkContext, inputs: List[String], output: String, symmetric:
Boolean, dumpText: Boolean) {
>
>     val nbSets = inputs.length
>
>     val datasets = inputs.map{ input =>
>
>       SparseMatrixIO.load(sc, input).groupByKey().mapValues{
>
>         sd => sd.map(_.get()).sum
>
>       }.persist(StorageLevel.MEMORY_AND_DISK_SER) }
>
>     val dataset = reduceCooccurrences(datasets)
>
>      val result = if (symmetric) {
>
>       dataset.flatMap {
>
>         case c => Seq(c,(new Cooccurrence(c._1.j, c._1.i), c._2))
>
>       }.groupByKey().mapValues{ sd => new DoubleWritable(sd.map(_.get()).sum) }
>
>     } else {
>
>       dataset.mapValues(new DoubleWritable(_))
>
>     }
>
>     SparseMatrixIO.write(output, result, dumpText)
>
>   }
>
>
> NB :
> Cooccurrence <=> (Int,Int)
> and SparseMatrixIO.load or wirte basically call newAPIHadoopFile and
> saveAsNewAPIHadoopFile
>
> --
>    [image: eXenSa]
>  *Guillaume PITEL, Président*
> +33(0)6 25 48 86 80
>
> eXenSa S.A.S. <http://www.exensa.com/>
>  41, rue Périer - 92120 Montrouge - FRANCE
> Tel +33(0)1 84 16 36 77 / Fax +33(0)9 72 28 37 05
>



-- 
https://github.com/zinnia-phatak-dev/Nectar

Mime
View raw message