Hi Sampo,
You could try zipWithIndex followed by a self join with shifted index
values like this:
val arr = Array((1, "A"), (8, "D"), (7, "C"), (3, "B"), (9, "E"))
val rdd = sc.parallelize(arr)
val sorted = rdd.sortByKey(true)
val zipped = sorted.zipWithIndex.map(x => (x._2, x._1))
val pairs = zipped.join(zipped.map(x => (x._1  1, x._2))).sortBy(_._1)
Which produces the consecutive elements as pairs in the RDD for further
processing:
(0,((1,A),(3,B)))
(1,((3,B),(7,C)))
(2,((7,C),(8,D)))
(3,((8,D),(9,E)))
There are probably more efficient ways to do this, but if your dataset
isn't too big it should work for you.
Cheers,
Dylan.
On 22 October 2015 at 17:35, Sampo Niskanen <sampo.niskanen@wellmo.com>
wrote:
> Hi,
>
> I have analytics data with timestamps on each element. I'd like to
> analyze consecutive elements using Spark, but haven't figured out how to do
> this.
>
> Essentially what I'd want is a transform from a sorted RDD [A, B, C, D, E]
> to an RDD [(A,B), (B,C), (C,D), (D,E)]. (Or some other way to analyze
> timerelated elements.)
>
> How can this be achieved?
>
>
> * Sampo Niskanen*
>
> *Lead developer / Wellmo*
> sampo.niskanen@wellmo.com
> +358 40 820 5291
>
>
