spark-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Cody Koeninger <c...@koeninger.org>
Subject Re: New JavaRDD Inside JavaPairDStream
Date Fri, 11 Sep 2015 19:43:14 GMT
No, in general you can't make new RDDs in code running on the executors.

It looks like your properties file is a constant, why not process it at the
beginning of the job and broadcast the result?

On Fri, Sep 11, 2015 at 2:09 PM, Rachana Srivastava <
Rachana.Srivastava@markmonitor.com> wrote:

> Hello all,
>
>
>
> Can we invoke JavaRDD while processing stream from Kafka for example.
> Following code is throwing some serialization exception.  Not sure if this
> is feasible.
>
>
>
>   JavaStreamingContext jssc = *new* JavaStreamingContext(jsc, Durations.
> *seconds*(5));
>
>     JavaPairReceiverInputDStream<String, String> messages = KafkaUtils.
> *createStream*(jssc, zkQuorum, group, topicMap);
>
>     JavaDStream<String> lines = messages.map(*new* *Function<Tuple2<String,
> String>, String>()* {
>
>       *public* String call(Tuple2<String, String> tuple2) { *return*
> tuple2._2();
>
>       }
>
>     });
>
>     JavaPairDStream<String, String> wordCounts = lines.mapToPair( *new* *PairFunction<String,
> String, String>()* {
>
>         *public* Tuple2<String, String> call(String urlString) {
>
>                         String propertiesFile =
> "/home/cloudera/Desktop/sample/input/featurelist.properties";
>
>                         JavaRDD<String> propertiesFileRDD = jsc.textFile(
> propertiesFile);
>
>                           JavaPairRDD<String, String> featureKeyClassPair
> = propertiesFileRDD.mapToPair(
>
>                                       *new* *PairFunction<String, String,
> String>()* {
>
>                                                   *public* Tuple2<String,
> String> call(String property) {
>
>                                                     *return* *new**
> Tuple2(**property**.split(**"="**)[0], **property**.split(**"="**)[1])*;
>
>                                                   }
>
>                                      });
>
>                                     featureKeyClassPair.count();
>
>           *return* *new* Tuple2<String, String>(urlString,  featureScore);
>
>         }
>
>       });
>
>
>

Mime
View raw message