spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Dylan Hogg <dylanh...@gmail.com>
Subject Re: Analyzing consecutive elements
Date Thu, 22 Oct 2015 11:43:53 GMT
Hi Sampo,

You could try zipWithIndex followed by a self join with shifted index
values like this:

val arr = Array((1, "A"), (8, "D"), (7, "C"), (3, "B"), (9, "E"))
val rdd = sc.parallelize(arr)
val sorted = rdd.sortByKey(true)

val zipped = sorted.zipWithIndex.map(x => (x._2, x._1))
val pairs = zipped.join(zipped.map(x => (x._1 - 1, x._2))).sortBy(_._1)

Which produces the consecutive elements as pairs in the RDD for further
processing:
(0,((1,A),(3,B)))
(1,((3,B),(7,C)))
(2,((7,C),(8,D)))
(3,((8,D),(9,E)))

There are probably more efficient ways to do this, but if your dataset
isn't too big it should work for you.

Cheers,
Dylan.


On 22 October 2015 at 17:35, Sampo Niskanen <sampo.niskanen@wellmo.com>
wrote:

> Hi,
>
> I have analytics data with timestamps on each element.  I'd like to
> analyze consecutive elements using Spark, but haven't figured out how to do
> this.
>
> Essentially what I'd want is a transform from a sorted RDD [A, B, C, D, E]
> to an RDD [(A,B), (B,C), (C,D), (D,E)].  (Or some other way to analyze
> time-related elements.)
>
> How can this be achieved?
>
>
> *    Sampo Niskanen*
>
> *Lead developer / Wellmo*
>     sampo.niskanen@wellmo.com
>     +358 40 820 5291
>
>

Mime
View raw message