spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Mukesh Jha <me.mukesh....@gmail.com>
Subject Re: KafkaUtils not consuming all the data from all partitions
Date Wed, 07 Jan 2015 15:05:55 GMT
I understand that I've to create 10 parallel streams. My code is running
fine when the no of partitions is ~20, but when I increase the no of
partitions I keep getting in this issue.

Below is my code to create kafka streams, along with the configs used.

    Map<String, String> kafkaConf = new HashMap<String, String>();
    kafkaConf.put("zookeeper.connect", kafkaZkQuorum);
    kafkaConf.put("group.id", kafkaConsumerGroup);
    kafkaConf.put("consumer.timeout.ms", "30000");
    kafkaConf.put("auto.offset.reset", "largest");
    kafkaConf.put("fetch.message.max.bytes", "20000000");
    kafkaConf.put("zookeeper.session.timeout.ms", "6000");
    kafkaConf.put("zookeeper.connection.timeout.ms", "6000");
    kafkaConf.put("zookeeper.sync.time.ms", "2000");
    kafkaConf.put("rebalance.backoff.ms", "10000");
    kafkaConf.put("rebalance.max.retries", "20");
    String[] topics = kafkaTopicsList;
    int numStreams = numKafkaThreads; // this is *10*
    Map<String, Integer> topicMap = new HashMap<>();
    for (String topic: topics) {
      topicMap.put(topic, numStreams);
    }

    List<JavaPairDStream<byte[], byte[]>> kafkaStreams = new
ArrayList<>(numStreams);
    for (int i = 0; i < numStreams; i++) {
      kafkaStreams.add(KafkaUtils.createStream(sc, byte[].class,
byte[].class, DefaultDecoder.class, DefaultDecoder.class, kafkaConf,
topicMap, StorageLevel.MEMORY_ONLY_SER()));
    }
    JavaPairDStream<byte[], byte[]> ks = sc.union(kafkaStreams.remove(0),
kafkaStreams);


On Wed, Jan 7, 2015 at 8:21 PM, Gerard Maas <gerard.maas@gmail.com> wrote:

> Hi,
>
> Could you add the code where you create the Kafka consumer?
>
> -kr, Gerard.
>
> On Wed, Jan 7, 2015 at 3:43 PM, <francois.garillot@typesafe.com> wrote:
>
>> Hi Mukesh,
>>
>> If my understanding is correct, each Stream only has a single Receiver.
>> So, if you have each receiver consuming 9 partitions, you need 10 input
>> DStreams to create 10 concurrent receivers:
>>
>>
>> https://spark.apache.org/docs/latest/streaming-programming-guide.html#level-of-parallelism-in-data-receiving
>>
>> Would you mind sharing a bit more on how you achieve this ?
>>
>> --
>> FG
>>
>>
>> On Wed, Jan 7, 2015 at 3:00 PM, Mukesh Jha <me.mukesh.jha@gmail.com>
>> wrote:
>>
>>> Hi Guys,
>>>
>>> I have a kafka topic having 90 partitions and I running
>>> SparkStreaming(1.2.0) to read from kafka via KafkaUtils to create 10
>>> kafka-receivers.
>>>
>>> My streaming is running fine and there is no delay in processing, just
>>> that some partitions data is never getting picked up. From the kafka
>>> console I can see that each receiver is consuming data from 9 partitions
>>> but the lag for some offsets keeps on increasing.
>>>
>>> Below is my kafka-consumers parameters.
>>>
>>> Any of you have face this kind of issue, if so then do you have any
>>> pointers to fix it?
>>>
>>>  Map<String, String> kafkaConf = new HashMap<String, String>();
>>>  kafkaConf.put("zookeeper.connect", kafkaZkQuorum);
>>>  kafkaConf.put("group.id", kafkaConsumerGroup);
>>>  kafkaConf.put("consumer.timeout.ms", "30000");
>>>  kafkaConf.put("auto.offset.reset", "largest");
>>>  kafkaConf.put("fetch.message.max.bytes", "20000000");
>>>  kafkaConf.put("zookeeper.session.timeout.ms", "6000");
>>>  kafkaConf.put("zookeeper.connection.timeout.ms", "6000");
>>>  kafkaConf.put("zookeeper.sync.time.ms", "2000");
>>>  kafkaConf.put("rebalance.backoff.ms", "10000");
>>>  kafkaConf.put("rebalance.max.retries", "20");
>>>
>>> --
>>> Thanks & Regards,
>>>
>>> Mukesh Jha <me.mukesh.jha@gmail.com>
>>>
>>
>>
>


-- 


Thanks & Regards,

*Mukesh Jha <me.mukesh.jha@gmail.com>*

Mime
View raw message