spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Cody Koeninger <c...@koeninger.org>
Subject Re: spark streaming 1.3 doubts(force it to not consume anything)
Date Tue, 18 Aug 2015 13:58:41 GMT
The superclass method in DStream is defined as returning an Option[RDD[T]]

On Tue, Aug 18, 2015 at 8:07 AM, Shushant Arora <shushantarora09@gmail.com>
wrote:

> Getting compilation error while overriding compute method of
> DirectKafkaInputDStream.
>
>
> [ERROR] CustomDirectKafkaInputDstream.java:[51,83]
> compute(org.apache.spark.streaming.Time) in CustomDirectKafkaInputDstream
> cannot override compute(org.apache.spark.streaming.Time) in
> org.apache.spark.streaming.dstream.DStream; attempting to use incompatible
> return type
>
> [ERROR] found   :
> scala.Option<org.apache.spark.streaming.kafka.KafkaRDD<byte[],byte[],kafka.serializer.DefaultDecoder,kafka.serializer.DefaultDecoder,byte[][]>>
>
> [ERROR] required: scala.Option<org.apache.spark.rdd.RDD<byte[][]>>
>
>
> class :
>
> public class CustomDirectKafkaInputDstream extends
> DirectKafkaInputDStream<byte[], byte[], kafka.serializer.DefaultDecoder,
> kafka.serializer.DefaultDecoder, byte[][]>{
>
> @Override
> public Option<KafkaRDD<byte[], byte[], DefaultDecoder, DefaultDecoder,
> byte[][]>> compute(
> Time validTime) {
>
> int processed=processedCounter.value();
> int failed = failedProcessingsCounter.value();
> if((processed==failed)){
> System.out.println("backing off since its 100 % failure");
> return Option.empty();
> }else{
> System.out.println("starting the stream ");
>
> return super.compute(validTime);
> }
> }
> }
>
>
> What should be the return type of compute method ? super class is
> returning Option<KafkaRDD<byte[], byte[], DefaultDecoder, DefaultDecoder,
> byte[][]>>  but its expecting
>  scala.Option<org.apache.spark.rdd.RDD<byte[][]>> from derived  class . Is
> there something wring with code?
>
> On Mon, Aug 17, 2015 at 7:08 PM, Cody Koeninger <cody@koeninger.org>
> wrote:
>
>> Look at the definitions of the java-specific
>> KafkaUtils.createDirectStream methods (the ones that take a
>> JavaStreamingContext)
>>
>> On Mon, Aug 17, 2015 at 5:13 AM, Shushant Arora <
>> shushantarora09@gmail.com> wrote:
>>
>>> How to create classtag in java ?Also Constructor
>>> of DirectKafkaInputDStream takes Function1 not Function but
>>> kafkautils.createDirectStream allows function.
>>>
>>> I have below as overriden DirectKafkaInputDStream.
>>>
>>>
>>> public class CustomDirectKafkaInputDstream extends
>>> DirectKafkaInputDStream<byte[], byte[], kafka.serializer.DefaultDecoder,
>>> kafka.serializer.DefaultDecoder, byte[][]>{
>>>
>>> public CustomDirectKafkaInputDstream(
>>> StreamingContext ssc_,
>>> Map<String, String> kafkaParams,
>>> Map<TopicAndPartition, Object> fromOffsets,
>>> Function1<MessageAndMetadata<byte[], byte[]>, byte[][]> messageHandler,
>>> ClassTag<byte[]> evidence$1, ClassTag<byte[]> evidence$2,
>>> ClassTag<DefaultDecoder> evidence$3,
>>> ClassTag<DefaultDecoder> evidence$4, ClassTag<byte[][]> evidence$5)
{
>>> super(ssc_, kafkaParams, fromOffsets, messageHandler, evidence$1,
>>> evidence$2,
>>> evidence$3, evidence$4, evidence$5);
>>> }
>>> @Override
>>> public Option<KafkaRDD<byte[], byte[], DefaultDecoder, DefaultDecoder,
>>> byte[][]>> compute(
>>> Time validTime) {
>>> int processe=processedCounter.value();
>>> int failed = failedProcessingsCounter.value();
>>> if((processed==failed)){
>>> System.out.println("backing off since its 100 % failure");
>>> return Option.empty();
>>> }else{
>>> System.out.println("starting the stream ");
>>>
>>> return super.compute(validTime);
>>> }
>>> }
>>>
>>>
>>>
>>> To create this stream
>>> I am using
>>> scala.collection.immutable.Map<String, String> scalakafkaParams =
>>> JavaConverters.mapAsScalaMapConverter(kafkaParams).asScala().toMap(Predef.<Tuple2<String,
>>> String>>conforms());
>>> scala.collection.immutable.Map<TopicAndPartition, Long>
>>> scalaktopicOffsetMap=
>>> JavaConverters.mapAsScalaMapConverter(topicOffsetMap).asScala().toMap(Predef.<Tuple2<TopicAndPartition,
>>> Long>>conforms());
>>>
>>> scala.Function1<MessageAndMetadata<byte[], byte[]>, byte[][]> handler
=
>>> new Function<MessageAndMetadata<byte[], byte[]>, byte[][]>() {
>>>         ..});
>>> JavaDStream<byte[][]> directKafkaStream = new
>>> CustomDirectKafkaInputDstream(jssc,scalakafkaParams ,scalaktopicOffsetMap,
>>> handler,byte[].class,byte[].class, kafka.serializer.DefaultDecoder.class,
>>> kafka.serializer.DefaultDecoder.class,byte[][].class);
>>>
>>>
>>>
>>> How to pass classTag to constructor in CustomDirectKafkaInputDstream ?
>>> And how to use Function instead of Function1 ?
>>>
>>>
>>>
>>> On Thu, Aug 13, 2015 at 12:16 AM, Cody Koeninger <cody@koeninger.org>
>>> wrote:
>>>
>>>> I'm not aware of an existing api per se, but you could create your own
>>>> subclass of the DStream that returns None for compute() under certain
>>>> conditions.
>>>>
>>>>
>>>>
>>>> On Wed, Aug 12, 2015 at 1:03 PM, Shushant Arora <
>>>> shushantarora09@gmail.com> wrote:
>>>>
>>>>> Hi Cody
>>>>>
>>>>> Can you help here if streaming 1.3 has any api for not consuming any
>>>>> message in next few runs?
>>>>>
>>>>> Thanks
>>>>>
>>>>> ---------- Forwarded message ----------
>>>>> From: Shushant Arora <shushantarora09@gmail.com>
>>>>> Date: Wed, Aug 12, 2015 at 11:23 PM
>>>>> Subject: spark streaming 1.3 doubts(force it to not consume anything)
>>>>> To: user <user@spark.apache.org>
>>>>>
>>>>>
>>>>> I Can't make my stream application batch interval to change at run
>>>>> time . Its always fixed and it always creates jobs at specified batch
>>>>> inetval and enqueue them if earleir batch is not finished.
>>>>>
>>>>> My requirement is to process the events and post them to some external
>>>>> server and if external server is down I want to increase the batch time
-
>>>>> that is not possible but can I make it not to consume any messages in
say
>>>>> next 5 successive runs ?
>>>>>
>>>>>
>>>>>
>>>>>
>>>>
>>>
>>
>

Mime
View raw message