spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Mayur Rustagi <mayur.rust...@gmail.com>
Subject Re: RDD to DStream
Date Fri, 01 Aug 2014 22:37:02 GMT
Nice question :)
Ideally you should use a queuestream interface to push RDD into a queue &
then spark streaming can handle the rest.
Though why are you looking to convert RDD to DStream, another workaround
folks use is to source DStream from folders & move files that they need
reprocessed back into the folder, its a hack but much less headache .

Mayur Rustagi
Ph: +1 (760) 203 3257
http://www.sigmoidanalytics.com
@mayur_rustagi <https://twitter.com/mayur_rustagi>



On Fri, Aug 1, 2014 at 10:21 AM, Aniket Bhatnagar <
aniket.bhatnagar@gmail.com> wrote:

> Hi everyone
>
> I haven't been receiving replies to my queries in the distribution list.
> Not pissed but I am actually curious to know if my messages are actually
> going through or not. Can someone please confirm that my msgs are getting
> delivered via this distribution list?
>
> Thanks,
> Aniket
>
>
> On 1 August 2014 13:55, Aniket Bhatnagar <aniket.bhatnagar@gmail.com>
> wrote:
>
>> Sometimes it is useful to convert a RDD into a DStream for testing
>> purposes (generating DStreams from historical data, etc). Is there an easy
>> way to do this?
>>
>> I could come up with the following inefficient way but no sure if there
>> is a better way to achieve this. Thoughts?
>>
>> class RDDExtension[T](rdd: RDD[T]) {
>>
>>   def chunked(chunkSize: Int): RDD[Seq[T]] = {
>>     rdd.mapPartitions(partitionItr => partitionItr.grouped(chunkSize))
>>   }
>>
>>   def skipFirst(): RDD[T] = {
>>     rdd.zipWithIndex().filter(tuple => tuple._2 > 0).map(_._1)
>>   }
>>
>>   def toStream(streamingContext: StreamingContext, chunkSize: Int,
>> slideDurationMilli: Option[Long] = None): DStream[T] = {
>>     new InputDStream[T](streamingContext) {
>>
>>       @volatile private var currentRDD: RDD[Seq[T]] =
>> rdd.chunked(chunkSize)
>>
>>       override def start(): Unit = {}
>>
>>       override def stop(): Unit = {}
>>
>>       override def compute(validTime: Time): Option[RDD[T]] = {
>>         val chunk = currentRDD.take(1)
>>         currentRDD = currentRDD.skipFirst()
>>         Some(rdd.sparkContext.parallelize(chunk))
>>       }
>>
>>       override def slideDuration = {
>>         slideDurationMilli.map(duration => new Duration(duration)).
>>           getOrElse(super.slideDuration)
>>       }
>>     }
>>
>> }
>>
>
>

Mime
View raw message