Return-Path: X-Original-To: apmail-spark-user-archive@minotaur.apache.org Delivered-To: apmail-spark-user-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 7671417B9C for ; Wed, 7 Jan 2015 15:06:24 +0000 (UTC) Received: (qmail 92605 invoked by uid 500); 7 Jan 2015 15:06:22 -0000 Delivered-To: apmail-spark-user-archive@spark.apache.org Received: (qmail 92532 invoked by uid 500); 7 Jan 2015 15:06:22 -0000 Mailing-List: contact user-help@spark.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Delivered-To: mailing list user@spark.apache.org Received: (qmail 92522 invoked by uid 99); 7 Jan 2015 15:06:22 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 07 Jan 2015 15:06:22 +0000 X-ASF-Spam-Status: No, hits=1.7 required=5.0 tests=FREEMAIL_ENVFROM_END_DIGIT,HTML_MESSAGE,RCVD_IN_DNSWL_LOW,SPF_PASS X-Spam-Check-By: apache.org Received-SPF: pass (nike.apache.org: domain of mukh.007@gmail.com designates 209.85.216.175 as permitted sender) Received: from [209.85.216.175] (HELO mail-qc0-f175.google.com) (209.85.216.175) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 07 Jan 2015 15:05:57 +0000 Received: by mail-qc0-f175.google.com with SMTP id p6so979999qcv.20 for ; Wed, 07 Jan 2015 07:05:55 -0800 (PST) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=gmail.com; s=20120113; h=mime-version:sender:in-reply-to:references:date:message-id:subject :from:to:cc:content-type; bh=6znmr40A2c+AT/buCKDAbmN9U+96z/atYglkVOgnBoU=; b=TkSUnO3M6AM3T5opQGudRkQaHQu+G1yUE5ravNFcI2n0DcFBZebDcXUxW3klMxLaRU a93+2vnrpWVYHODKP75UeVd6wmlwqgDPB6IAMcHHEC90yzp7tI64iVU1udchOv0HLCo2 n5TOyoA4KslYpJCzjSmWfRLxMIuu2ft4OZqtfRLKx3mO4SQqsv7w8WGJusc0QwMTYg/v 0EANZ5ni10346PkDlApXrehpXwni10QywkQfZNW8f/pJJDOfOk8+3dBbZkWiTzZCGLfv ThxBFWt4/WRHzSUsxrZUOfqpeccsDl6EI70GEvmJefc7co/F2sJMlN4Pb+FyqUiaBOgK 9c8w== MIME-Version: 1.0 X-Received: by 10.224.20.1 with SMTP id d1mr5398727qab.58.1420643155548; Wed, 07 Jan 2015 07:05:55 -0800 (PST) Sender: mukh.007@gmail.com Received: by 10.96.122.134 with HTTP; Wed, 7 Jan 2015 07:05:55 -0800 (PST) In-Reply-To: References: <1420641823124.dd4e8adb@Nodemailer> Date: Wed, 7 Jan 2015 20:35:55 +0530 X-Google-Sender-Auth: c7t3c_UyZRoi09DIoTL0E-uw-bo Message-ID: Subject: Re: KafkaUtils not consuming all the data from all partitions From: Mukesh Jha To: Gerard Maas Cc: "francois.garillot@typesafe.com" , spark users , users@kafka.apache.org Content-Type: multipart/alternative; boundary=001a11c1e294a3a2cd050c11422e X-Virus-Checked: Checked by ClamAV on apache.org --001a11c1e294a3a2cd050c11422e Content-Type: text/plain; charset=ISO-8859-1 I understand that I've to create 10 parallel streams. My code is running fine when the no of partitions is ~20, but when I increase the no of partitions I keep getting in this issue. Below is my code to create kafka streams, along with the configs used. Map kafkaConf = new HashMap(); kafkaConf.put("zookeeper.connect", kafkaZkQuorum); kafkaConf.put("group.id", kafkaConsumerGroup); kafkaConf.put("consumer.timeout.ms", "30000"); kafkaConf.put("auto.offset.reset", "largest"); kafkaConf.put("fetch.message.max.bytes", "20000000"); kafkaConf.put("zookeeper.session.timeout.ms", "6000"); kafkaConf.put("zookeeper.connection.timeout.ms", "6000"); kafkaConf.put("zookeeper.sync.time.ms", "2000"); kafkaConf.put("rebalance.backoff.ms", "10000"); kafkaConf.put("rebalance.max.retries", "20"); String[] topics = kafkaTopicsList; int numStreams = numKafkaThreads; // this is *10* Map topicMap = new HashMap<>(); for (String topic: topics) { topicMap.put(topic, numStreams); } List> kafkaStreams = new ArrayList<>(numStreams); for (int i = 0; i < numStreams; i++) { kafkaStreams.add(KafkaUtils.createStream(sc, byte[].class, byte[].class, DefaultDecoder.class, DefaultDecoder.class, kafkaConf, topicMap, StorageLevel.MEMORY_ONLY_SER())); } JavaPairDStream ks = sc.union(kafkaStreams.remove(0), kafkaStreams); On Wed, Jan 7, 2015 at 8:21 PM, Gerard Maas wrote: > Hi, > > Could you add the code where you create the Kafka consumer? > > -kr, Gerard. > > On Wed, Jan 7, 2015 at 3:43 PM, wrote: > >> Hi Mukesh, >> >> If my understanding is correct, each Stream only has a single Receiver. >> So, if you have each receiver consuming 9 partitions, you need 10 input >> DStreams to create 10 concurrent receivers: >> >> >> https://spark.apache.org/docs/latest/streaming-programming-guide.html#level-of-parallelism-in-data-receiving >> >> Would you mind sharing a bit more on how you achieve this ? >> >> -- >> FG >> >> >> On Wed, Jan 7, 2015 at 3:00 PM, Mukesh Jha >> wrote: >> >>> Hi Guys, >>> >>> I have a kafka topic having 90 partitions and I running >>> SparkStreaming(1.2.0) to read from kafka via KafkaUtils to create 10 >>> kafka-receivers. >>> >>> My streaming is running fine and there is no delay in processing, just >>> that some partitions data is never getting picked up. From the kafka >>> console I can see that each receiver is consuming data from 9 partitions >>> but the lag for some offsets keeps on increasing. >>> >>> Below is my kafka-consumers parameters. >>> >>> Any of you have face this kind of issue, if so then do you have any >>> pointers to fix it? >>> >>> Map kafkaConf = new HashMap(); >>> kafkaConf.put("zookeeper.connect", kafkaZkQuorum); >>> kafkaConf.put("group.id", kafkaConsumerGroup); >>> kafkaConf.put("consumer.timeout.ms", "30000"); >>> kafkaConf.put("auto.offset.reset", "largest"); >>> kafkaConf.put("fetch.message.max.bytes", "20000000"); >>> kafkaConf.put("zookeeper.session.timeout.ms", "6000"); >>> kafkaConf.put("zookeeper.connection.timeout.ms", "6000"); >>> kafkaConf.put("zookeeper.sync.time.ms", "2000"); >>> kafkaConf.put("rebalance.backoff.ms", "10000"); >>> kafkaConf.put("rebalance.max.retries", "20"); >>> >>> -- >>> Thanks & Regards, >>> >>> Mukesh Jha >>> >> >> > -- Thanks & Regards, *Mukesh Jha * --001a11c1e294a3a2cd050c11422e Content-Type: text/html; charset=ISO-8859-1 Content-Transfer-Encoding: quoted-printable
I understand that I've to create 10 parallel stre= ams. My code is running fine when the no of partitions is ~20, but when I i= ncrease the no of partitions I keep getting in this issue. 
=
Below is my code to create kafka streams, along with the con= figs used.

    Map<String, String>= kafkaConf =3D new HashMap<String, String>();
   = kafkaConf.put("zookeeper.connect", kafkaZkQuorum);
&nb= sp;   kafkaConf.put("group.id&quo= t;, kafkaConsumerGroup);
    kafkaConf.put("consumer.timeout.ms", "30000= ");
    kafkaConf.put("auto.offset.reset"= ;, "largest");
    kafkaConf.put("fetch.= message.max.bytes", "20000000");
    kaf= kaConf.put("zookeeper.= session.timeout.ms", "6000");
    ka= fkaConf.put("zookee= per.connection.timeout.ms", "6000");
  &n= bsp; kafkaConf.put("zookeepe= r.sync.time.ms", "2000");

On Wed, Jan 7, 2015 at 8:21 PM, Gerard Maas <ger= ard.maas@gmail.com> wrote:
=
Hi,

Could you add the code where you cr= eate the Kafka consumer?

-kr, Gerard.
<= div class=3D"HOEnZb">

On Wed, Jan 7, 2015 at 3:43 PM, &l= t;franc= ois.garillot@typesafe.com> wrote:
Hi Mukesh,

If my understanding is correct, each Stream only has a single Receiver= . So, if you have each receiver consuming 9 partitions, you need 10 input D= Streams to create 10 concurrent receivers:


Would you mind sharing a bit more on how you achieve this ?


FG


On Wed, Jan 7, 2015 at 3:00 PM, Mukes= h Jha <me.mukesh.jha@gmail.com> wrote:

Hi Guys,

I have a kafka topic having 90 partitions and I running SparkStreaming= (1.2.0) to read from kafka via KafkaUtils to create 10 kafka-receivers.

My streaming is running fine and there is no delay in processing, just= that some partitions data is never getting picked up. From the kafka conso= le I can see that each receiver is consuming data from 9 partitions but the= lag for some offsets keeps on increasing. 

Below is my kafka-consumers parameters.

Any of you have face this kind of issue, if so then do you have any po= inters to fix it?

Map<String, String> kafkaConf =3D new HashMap<String, String&= gt;();
kafkaConf.put("zookeeper= .connect", kafkaZkQuorum);
kafkaConf.put("group.id", kafkaConsumerGro= up);
kafkaConf.put("consumer.timeout.ms&q= uot;, "30000");
kafkaConf.put("auto.offs= et.reset", "largest");
kafkaConf.put("fetch.mes= sage.max.bytes", "20000000");
kafkaConf.put("zookeeper.sessio= n.timeout.ms", "6000");
kafkaConf.put("zookeeper.con= nection.timeout.ms", "6000");
kafkaConf.put("zookeeper.sync.time.ms= ", "2000");
kafkaConf.put("rebalance.backoff.ms= ", "10000");
kafkaConf.put("rebalance= .max.retries", "20");

--
Thank= s & Regards,





--
=


Thanks & Regards,
--001a11c1e294a3a2cd050c11422e--