spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Shushant Arora <shushantaror...@gmail.com>
Subject Re: spark streaming 1.3 doubts(force it to not consume anything)
Date Wed, 19 Aug 2015 17:56:27 GMT
will try scala.
Only Reason of not using scala is - never worked on that.

On Wed, Aug 19, 2015 at 7:34 PM, Cody Koeninger <cody@koeninger.org> wrote:

> Is there a reason not to just use scala?  It's not a lot of code... and
> it'll be even less code in scala ;)
>
> On Wed, Aug 19, 2015 at 4:14 AM, Shushant Arora <shushantarora09@gmail.com
> > wrote:
>
>> To correct myself - KafkaRDD[K, V, U, T, R] is subclass of RDD[R] but
>> Option<KafkaRDD[K, V, U, T, R] >  is not subclass of Option<RDD[R]>;
>>
>> In scala C[T’] is a subclass of C[T] as per
>> https://twitter.github.io/scala_school/type-basics.html
>> but this is not allowed in java.
>>
>> So is there any workaround to achieve this in java for overriding DirectKafkaInputDStream
>> ?
>>
>>
>> On Wed, Aug 19, 2015 at 12:45 AM, Shushant Arora <
>> shushantarora09@gmail.com> wrote:
>>
>>> But KafkaRDD[K, V, U, T, R] is not subclass of RDD[R] as per java
>>> generic inheritance is not supported so derived class cannot return
>>>  different genric typed subclass from overriden method.
>>>
>>> On Wed, Aug 19, 2015 at 12:02 AM, Cody Koeninger <cody@koeninger.org>
>>> wrote:
>>>
>>>> Option is covariant and KafkaRDD is a subclass of RDD
>>>>
>>>> On Tue, Aug 18, 2015 at 1:12 PM, Shushant Arora <
>>>> shushantarora09@gmail.com> wrote:
>>>>
>>>>> Is it that in scala its allowed for derived class to have any return
>>>>> type ?
>>>>>
>>>>>  And streaming jar is originally created in scala so its allowed for
>>>>> DirectKafkaInputDStream  to return Option[KafkaRDD[K, V, U, T, R]]
>>>>> compute method ?
>>>>>
>>>>> On Tue, Aug 18, 2015 at 8:36 PM, Shushant Arora <
>>>>> shushantarora09@gmail.com> wrote:
>>>>>
>>>>>> looking at source code of
>>>>>> org.apache.spark.streaming.kafka.DirectKafkaInputDStream
>>>>>>
>>>>>> override def compute(validTime: Time): Option[KafkaRDD[K, V, U, T,
>>>>>> R]] = {
>>>>>>     val untilOffsets = clamp(latestLeaderOffsets(maxRetries))
>>>>>>     val rdd = KafkaRDD[K, V, U, T, R](
>>>>>>       context.sparkContext, kafkaParams, currentOffsets,
>>>>>> untilOffsets, messageHandler)
>>>>>>
>>>>>>     currentOffsets = untilOffsets.map(kv => kv._1 -> kv._2.offset)
>>>>>>     Some(rdd)
>>>>>>   }
>>>>>>
>>>>>>
>>>>>> But in DStream its def compute (validTime: Time): Option[RDD[T]]
 ,
>>>>>>
>>>>>> So what should  be the return type of custom DStream extends
>>>>>> DirectKafkaInputDStream .
>>>>>> Since I want the behaviour to be same as of DirectKafkaInputDStream
>>>>>>  in normal scenarios and return none in specific scenario.
>>>>>>
>>>>>> And why the same error did not come while extending
>>>>>> DirectKafkaInputDStream from InputDStream ? Since new return type
Option[KafkaRDD[K,
>>>>>> V, U, T, R]] is not subclass of Option[RDD[T] so it should have been
>>>>>> failed?
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>> On Tue, Aug 18, 2015 at 7:28 PM, Cody Koeninger <cody@koeninger.org>
>>>>>> wrote:
>>>>>>
>>>>>>> The superclass method in DStream is defined as returning an
>>>>>>> Option[RDD[T]]
>>>>>>>
>>>>>>> On Tue, Aug 18, 2015 at 8:07 AM, Shushant Arora <
>>>>>>> shushantarora09@gmail.com> wrote:
>>>>>>>
>>>>>>>> Getting compilation error while overriding compute method
of
>>>>>>>> DirectKafkaInputDStream.
>>>>>>>>
>>>>>>>>
>>>>>>>> [ERROR] CustomDirectKafkaInputDstream.java:[51,83]
>>>>>>>> compute(org.apache.spark.streaming.Time) in CustomDirectKafkaInputDstream
>>>>>>>> cannot override compute(org.apache.spark.streaming.Time)
in
>>>>>>>> org.apache.spark.streaming.dstream.DStream; attempting to
use incompatible
>>>>>>>> return type
>>>>>>>>
>>>>>>>> [ERROR] found   :
>>>>>>>> scala.Option<org.apache.spark.streaming.kafka.KafkaRDD<byte[],byte[],kafka.serializer.DefaultDecoder,kafka.serializer.DefaultDecoder,byte[][]>>
>>>>>>>>
>>>>>>>> [ERROR] required: scala.Option<org.apache.spark.rdd.RDD<byte[][]>>
>>>>>>>>
>>>>>>>>
>>>>>>>> class :
>>>>>>>>
>>>>>>>> public class CustomDirectKafkaInputDstream extends
>>>>>>>> DirectKafkaInputDStream<byte[], byte[], kafka.serializer.DefaultDecoder,
>>>>>>>> kafka.serializer.DefaultDecoder, byte[][]>{
>>>>>>>>
>>>>>>>> @Override
>>>>>>>> public Option<KafkaRDD<byte[], byte[], DefaultDecoder,
>>>>>>>> DefaultDecoder, byte[][]>> compute(
>>>>>>>> Time validTime) {
>>>>>>>>
>>>>>>>> int processed=processedCounter.value();
>>>>>>>> int failed = failedProcessingsCounter.value();
>>>>>>>> if((processed==failed)){
>>>>>>>> System.out.println("backing off since its 100 % failure");
>>>>>>>> return Option.empty();
>>>>>>>> }else{
>>>>>>>> System.out.println("starting the stream ");
>>>>>>>>
>>>>>>>> return super.compute(validTime);
>>>>>>>> }
>>>>>>>> }
>>>>>>>> }
>>>>>>>>
>>>>>>>>
>>>>>>>> What should be the return type of compute method ? super
class is
>>>>>>>> returning Option<KafkaRDD<byte[], byte[], DefaultDecoder,
DefaultDecoder,
>>>>>>>> byte[][]>>  but its expecting
>>>>>>>>  scala.Option<org.apache.spark.rdd.RDD<byte[][]>>
from derived  class . Is
>>>>>>>> there something wring with code?
>>>>>>>>
>>>>>>>> On Mon, Aug 17, 2015 at 7:08 PM, Cody Koeninger <cody@koeninger.org
>>>>>>>> > wrote:
>>>>>>>>
>>>>>>>>> Look at the definitions of the java-specific
>>>>>>>>> KafkaUtils.createDirectStream methods (the ones that
take a
>>>>>>>>> JavaStreamingContext)
>>>>>>>>>
>>>>>>>>> On Mon, Aug 17, 2015 at 5:13 AM, Shushant Arora <
>>>>>>>>> shushantarora09@gmail.com> wrote:
>>>>>>>>>
>>>>>>>>>> How to create classtag in java ?Also Constructor
>>>>>>>>>> of DirectKafkaInputDStream takes Function1 not Function
but
>>>>>>>>>> kafkautils.createDirectStream allows function.
>>>>>>>>>>
>>>>>>>>>> I have below as overriden DirectKafkaInputDStream.
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> public class CustomDirectKafkaInputDstream extends
>>>>>>>>>> DirectKafkaInputDStream<byte[], byte[], kafka.serializer.DefaultDecoder,
>>>>>>>>>> kafka.serializer.DefaultDecoder, byte[][]>{
>>>>>>>>>>
>>>>>>>>>> public CustomDirectKafkaInputDstream(
>>>>>>>>>> StreamingContext ssc_,
>>>>>>>>>> Map<String, String> kafkaParams,
>>>>>>>>>> Map<TopicAndPartition, Object> fromOffsets,
>>>>>>>>>> Function1<MessageAndMetadata<byte[], byte[]>,
byte[][]>
>>>>>>>>>> messageHandler,
>>>>>>>>>> ClassTag<byte[]> evidence$1, ClassTag<byte[]>
evidence$2,
>>>>>>>>>> ClassTag<DefaultDecoder> evidence$3,
>>>>>>>>>> ClassTag<DefaultDecoder> evidence$4, ClassTag<byte[][]>
>>>>>>>>>> evidence$5) {
>>>>>>>>>> super(ssc_, kafkaParams, fromOffsets, messageHandler,
evidence$1,
>>>>>>>>>> evidence$2,
>>>>>>>>>> evidence$3, evidence$4, evidence$5);
>>>>>>>>>> }
>>>>>>>>>> @Override
>>>>>>>>>> public Option<KafkaRDD<byte[], byte[], DefaultDecoder,
>>>>>>>>>> DefaultDecoder, byte[][]>> compute(
>>>>>>>>>> Time validTime) {
>>>>>>>>>> int processe=processedCounter.value();
>>>>>>>>>> int failed = failedProcessingsCounter.value();
>>>>>>>>>> if((processed==failed)){
>>>>>>>>>> System.out.println("backing off since its 100 % failure");
>>>>>>>>>> return Option.empty();
>>>>>>>>>> }else{
>>>>>>>>>> System.out.println("starting the stream ");
>>>>>>>>>>
>>>>>>>>>> return super.compute(validTime);
>>>>>>>>>> }
>>>>>>>>>> }
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> To create this stream
>>>>>>>>>> I am using
>>>>>>>>>> scala.collection.immutable.Map<String, String>
scalakafkaParams =
>>>>>>>>>> JavaConverters.mapAsScalaMapConverter(kafkaParams).asScala().toMap(Predef.<Tuple2<String,
>>>>>>>>>> String>>conforms());
>>>>>>>>>> scala.collection.immutable.Map<TopicAndPartition,
Long>
>>>>>>>>>> scalaktopicOffsetMap=
>>>>>>>>>> JavaConverters.mapAsScalaMapConverter(topicOffsetMap).asScala().toMap(Predef.<Tuple2<TopicAndPartition,
>>>>>>>>>> Long>>conforms());
>>>>>>>>>>
>>>>>>>>>> scala.Function1<MessageAndMetadata<byte[],
byte[]>, byte[][]>
>>>>>>>>>> handler = new Function<MessageAndMetadata<byte[],
byte[]>, byte[][]>() {
>>>>>>>>>>         ..});
>>>>>>>>>> JavaDStream<byte[][]> directKafkaStream = new
>>>>>>>>>> CustomDirectKafkaInputDstream(jssc,scalakafkaParams
,scalaktopicOffsetMap,
>>>>>>>>>> handler,byte[].class,byte[].class, kafka.serializer.DefaultDecoder.class,
>>>>>>>>>> kafka.serializer.DefaultDecoder.class,byte[][].class);
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> How to pass classTag to constructor in
>>>>>>>>>> CustomDirectKafkaInputDstream ? And how to use Function
instead of
>>>>>>>>>> Function1 ?
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On Thu, Aug 13, 2015 at 12:16 AM, Cody Koeninger
<
>>>>>>>>>> cody@koeninger.org> wrote:
>>>>>>>>>>
>>>>>>>>>>> I'm not aware of an existing api per se, but
you could create
>>>>>>>>>>> your own subclass of the DStream that returns
None for compute() under
>>>>>>>>>>> certain conditions.
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> On Wed, Aug 12, 2015 at 1:03 PM, Shushant Arora
<
>>>>>>>>>>> shushantarora09@gmail.com> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Hi Cody
>>>>>>>>>>>>
>>>>>>>>>>>> Can you help here if streaming 1.3 has any
api for not
>>>>>>>>>>>> consuming any message in next few runs?
>>>>>>>>>>>>
>>>>>>>>>>>> Thanks
>>>>>>>>>>>>
>>>>>>>>>>>> ---------- Forwarded message ----------
>>>>>>>>>>>> From: Shushant Arora <shushantarora09@gmail.com>
>>>>>>>>>>>> Date: Wed, Aug 12, 2015 at 11:23 PM
>>>>>>>>>>>> Subject: spark streaming 1.3 doubts(force
it to not consume
>>>>>>>>>>>> anything)
>>>>>>>>>>>> To: user <user@spark.apache.org>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> I Can't make my stream application batch
interval to change at
>>>>>>>>>>>> run time . Its always fixed and it always
creates jobs at specified batch
>>>>>>>>>>>> inetval and enqueue them if earleir batch
is not finished.
>>>>>>>>>>>>
>>>>>>>>>>>> My requirement is to process the events and
post them to some
>>>>>>>>>>>> external server and if external server is
down I want to increase the batch
>>>>>>>>>>>> time - that is not possible but can I make
it not to consume any messages
>>>>>>>>>>>> in say next 5 successive runs ?
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Mime
View raw message