spark-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Alberto Rodriguez <ardl...@gmail.com>
Subject Re: Exception using the new createDirectStream util method
Date Fri, 20 Mar 2015 07:44:53 GMT
You were absolutely right Cody!! I have just put a message in the kafka
topic before creating the DirectStream and now is working fine!

Do you think that I should open an issue to warn that the kafka topic must
contain at least one message before the DirectStream creation?

Thank you very much! You've just made my day ;)

2015-03-19 23:08 GMT+01:00 Cody Koeninger <cody@koeninger.org>:

> Yeah, I wouldn't be shocked if Kafka's metadata apis didn't return results
> for topics that don't have any messages.  (sorry about the triple negative,
> but I think you get my meaning).
>
> Try putting a message in the topic and seeing what happens.
>
> On Thu, Mar 19, 2015 at 4:38 PM, Alberto Rodriguez <ardlema@gmail.com>
> wrote:
>
>> Thank you for replying,
>>
>> Ted, I have been debuging and the getLeaderOffsets method is not appending
>> errors because the method findLeaders that is called at the first line of
>> getLeaderOffsets is not returning leaders.
>>
>> Cody, the topics do not have any messages yet. Could this be an issue??
>>
>> If you guys want to have a look at the code I've just uploaded it to my
>> github account: big-brother <https://github.com/ardlema/big-brother> (see
>>
>> DirectKafkaWordCountTest.scala).
>>
>> Thank you again!!
>>
>> 2015-03-19 22:13 GMT+01:00 Cody Koeninger <cody@koeninger.org>:
>>
>> > What is the value of your topics variable, and does it correspond to
>> > topics that already exist on the cluster and have messages in them?
>> >
>> > On Thu, Mar 19, 2015 at 3:10 PM, Ted Yu <yuzhihong@gmail.com> wrote:
>> >
>> >> Looking at KafkaCluster#getLeaderOffsets():
>> >>
>> >>           respMap.get(tp).foreach { por: PartitionOffsetsResponse =>
>> >>             if (por.error == ErrorMapping.NoError) {
>> >> ...
>> >>             } else {
>> >>               errs.append(ErrorMapping.exceptionFor(por.error))
>> >>             }
>> >> There should be some error other than "Couldn't find leader offsets for
>> >> Set()"
>> >>
>> >> Can you check again ?
>> >>
>> >> Thanks
>> >>
>> >> On Thu, Mar 19, 2015 at 12:10 PM, Alberto Rodriguez <ardlema@gmail.com
>> >
>> >> wrote:
>> >>
>> >> > Hi all,
>> >> >
>> >> > I am trying to make the new kafka and spark streaming integration
>> work
>> >> > (direct
>> >> > approach "no receivers"
>> >> > <http://spark.apache.org/docs/1.3.0/streaming-kafka-integration.html
>> >).
>> >> I
>> >> > have created an unit test where I configure and start both zookeeper
>> and
>> >> > kafka.
>> >> >
>> >> > When I try to create the InputDStream using the createDirectStream
>> >> method
>> >> > of the KafkaUtils class I am getting the following error:
>> >> >
>> >> > org.apache.spark.SparkException:* Couldn't find leader offsets for
>> >> Set()*
>> >> > org.apache.spark.SparkException: org.apache.spark.SparkException:
>> >> Couldn't
>> >> > find leader offsets for Set()
>> >> > at
>> >> >
>> >> >
>> >>
>> org.apache.spark.streaming.kafka.KafkaUtils$$anonfun$createDirectStream$2.apply(KafkaUtils.scala:413)
>> >> >
>> >> > Following is the code that tries to create the DStream:
>> >> >
>> >> > val messages: InputDStream[(String, String)] =
>> >> > KafkaUtils.createDirectStream[String, String, StringDecoder,
>> >> > StringDecoder](
>> >> >         ssc, kafkaParams, topics)
>> >> >
>> >> > Does anyone faced this problem?
>> >> >
>> >> > Thank you in advance.
>> >> >
>> >> > Kind regards,
>> >> >
>> >> > Alberto
>> >> >
>> >>
>> >
>> >
>>
>
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message