Return-Path: X-Original-To: apmail-storm-user-archive@minotaur.apache.org Delivered-To: apmail-storm-user-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 77F751761C for ; Tue, 3 Feb 2015 14:51:57 +0000 (UTC) Received: (qmail 22137 invoked by uid 500); 3 Feb 2015 14:51:57 -0000 Delivered-To: apmail-storm-user-archive@storm.apache.org Received: (qmail 22090 invoked by uid 500); 3 Feb 2015 14:51:57 -0000 Mailing-List: contact user-help@storm.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: user@storm.apache.org Delivered-To: mailing list user@storm.apache.org Received: (qmail 22080 invoked by uid 99); 3 Feb 2015 14:51:57 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 03 Feb 2015 14:51:57 +0000 X-ASF-Spam-Status: No, hits=2.7 required=5.0 tests=HTML_MESSAGE,HTML_OBFUSCATE_10_20,RCVD_IN_DNSWL_LOW,SPF_PASS X-Spam-Check-By: apache.org Received-SPF: pass (nike.apache.org: local policy) Received: from [66.111.4.27] (HELO out3-smtp.messagingengine.com) (66.111.4.27) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 03 Feb 2015 14:51:32 +0000 Received: from compute4.internal (compute4.nyi.internal [10.202.2.44]) by mailout.nyi.internal (Postfix) with ESMTP id 750EE20C5E for ; Tue, 3 Feb 2015 09:47:15 -0500 (EST) Received: from web1 ([10.202.2.211]) by compute4.internal (MEProxy); Tue, 03 Feb 2015 09:47:15 -0500 DKIM-Signature: v=1; a=rsa-sha1; c=relaxed/relaxed; d=harsha.io; h= message-id:x-sasl-enc:from:to:mime-version :content-transfer-encoding:content-type:in-reply-to:references :subject:date; s=mesmtp; bh=RC88vjXHjN8ZkIRQ7FuuwU5EheM=; b=eVAq /Q3z7DW8amKAA1Y8GbhCB8bZmVmPVZ7DmjajDty+QFYD7uws+wGG+qXnr1HD9vgn bkvJ3JAz4Hr1R59T/eaSKQOZBE2JWGRiEWkYIxYOlgHpkE46M1BmBtfrqMXYTR8q /u6KcLsSK5nktB/Sd18KLQPIXNHzxOrlZoRtLHw= DKIM-Signature: v=1; a=rsa-sha1; c=relaxed/relaxed; d= messagingengine.com; h=message-id:x-sasl-enc:from:to :mime-version:content-transfer-encoding:content-type:in-reply-to :references:subject:date; s=smtpout; bh=RC88vjXHjN8ZkIRQ7FuuwU5E heM=; b=qhMdJYFRoq6YoFV3QKBl6mQmRBtIKYW2mWxhifH42wi1ehhE8d3tuXks a2kHqNQbGKY5QjX22s+ZiK411QOAQuA1WbZ0qQ7nENXhUuxvM3TB+OvMl8hpa4C/ Oya2zzTc50+fE4ByXEoPLsTOwDa7/Mi88qat/xFygolG6xwzlxw= Received: by web1.nyi.internal (Postfix, from userid 99) id 31F2CAE47F7; Tue, 3 Feb 2015 09:47:15 -0500 (EST) Message-Id: <1422974834.3751326.222506573.0D5082CF@webmail.messagingengine.com> X-Sasl-Enc: H5q/Jkh7MsgxevIXWmXXg3gWA2Xrk8Oq/PYUCZHFo1HU 1422974834 From: Harsha To: user@storm.apache.org MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Content-Type: multipart/alternative; boundary="_----------=_142297483537513261"; charset="utf-8" X-Mailer: MessagingEngine.com Webmail Interface - ajax-b6284d51 In-Reply-To: References: <1422888250.3259447.221984877.1112E48B@webmail.messagingengine.com> <1422896440.2323755.222051029.79FCC332@webmail.messagingengine.com> Subject: Re: Storm Kafka Processing Date: Tue, 03 Feb 2015 06:47:14 -0800 X-Virus-Checked: Checked by ClamAV on apache.org This is a multi-part message in MIME format. --_----------=_142297483537513261 Content-Transfer-Encoding: 7bit Content-Type: text/plain Vineet, In kafka producer.send(KeyedMessage) are you passing in a ID. If this is constant or null your data won't be distributed to all partitions. In case of constant Id all of your messages goes to same partition and incase of null it chooses round-robin to distribute among partitions. Its better to use a random UUID to distribute among all of your partitions. -Harsha On Tue, Feb 3, 2015, at 12:44 AM, Vineet Mishra wrote: > Do you mean to say that the event published to Kafka is not partition > distributed? > > Well while creating the topic I ensured to create # of partitions as > 10 and replication factor as 3. > > Is it something effected as how I am writing to Kafka? > > Thanks! > > On Tue, Feb 3, 2015 at 1:50 PM, Andrew Neilson > wrote: >> The behaviour you are describing sounds like your topology is >> processing a small backlog of events built up in each partition and >> then catching up to realtime where events are only being published to >> one of the 10 partitions at a time. I will echo Harsha in suggesting >> that you verify you are actually publishing to all partitions >> (important: this is *not* the default behaviour). >> >> On Tue, Feb 3, 2015 at 12:05 AM, Vineet Mishra >> wrote: >>> Hi Harsha, >>> >>> Based on the proposed metric, I ensured the specified changes by >>> changing the Kafka-Storm Version bundle. >>> >>> Although I could see the difference from the last bundle used to the >>> current change but was not satisfied by the way Spouts were >>> processing. The observation which I had was, >>> >>> The Spout were running with Executor counts as 10, while initiating >>> the job around half of the executors(5) started processing in >>> parallel to ingest the data. >>> >>> As soon as the counts reached around a million or so the state of >>> parallelism dropped and eventually it started processing in >>> serially(One Executor at a time). >>> >>> Executors (All time) IdUptimeHostPortEmittedTransferredComplete >>> latency (ms)AckedFailed [2-2]13m 54shost367030.000 [3-3]13m >>> 52shost267023183003183004.789318160 [4-4]13m >>> 52shost367024342004342007.064434380 [5-5]13m 53shost2670120200.000 >>> [6-6]13m 55shost367010.000 [7-7]13m 51shost2670025000250004.12224500 >>> [8-8]13m 51shost367002483602483609.514245780 [9-9]13m >>> 52shost267030.000 [10-10]13m 54shost367032352202352209.250233200 >>> [11-11]13m 52shost2670220442020442010.382205800 >>> >>> I am having around .2 Billion Events ingested to Kafka which needs >>> to be processed through Storm in Real time but I am not sure what is >>> making this unexpected intermittent behavior of the storm and how >>> can I prevent this in near future. >>> >>> Expecting Expert Suggestions. >>> >>> Thanks! >>> >>> >>> >>> On Mon, Feb 2, 2015 at 11:53 PM, Vineet Mishra >>> wrote: >>>> Well I am already running Kafka with 10 Partitions and Replication >>>> factor as 3 which is the default size of my cluster. >>>> >>>> bin/kafka-topics.sh --create --zookeeper >>>> host1:2181,host2:2181,host3:2181 --replication-factor 3 >>>> --partitions 10 --topic test >>>> >>>> and I am also running Kafka Storm topology with Executors count >>>> as 10 >>>> >>>> TopologyBuilder builder=new TopologyBuilder(); >>>> builder.setSpout("KafkaSpout", new KafkaSpout(kafkaConfig), 10); >>>> >>>> I am having a notion that since the time I have started running >>>> Kafka from last* changed RF and # of Partitions I am landing up >>>> with latency. >>>> >>>> * bin/kafka-topics.sh --create --zookeeper >>>> host1:2181,host2:2181,host3:2181 --replication-factor 1 >>>> --partitions 1 --topic test >>>> >>>> Well I will try with above provided Storm Kafka bundle. Hope that >>>> could help out! >>>> >>>> Thanks! >>>> >>>> On Mon, Feb 2, 2015 at 10:30 PM, Harsha wrote: >>>>> __ >>>>> Vineet, Can you try using the one in storm >>>>> https://github.com/apache/storm/tree/master/external/storm-kafka . >>>>> This is published into maven repo. So you can use the following >>>>> org.apache.storm >>>>> storm-kafka 0.9.3 >>>>> >>>>> >>>>> If you are using topic with partitions size 10 make sure you >>>>> configured your kafka spout with parallelism set to 10. Also make >>>>> sure on the producer side you are pushing data onto all of the 10 >>>>> partitions so that your kafka spout is fetching data from all of >>>>> the 10 partitions. >>>>> >>>>> -Harsha >>>>> >>>>> >>>>> >>>>> On Mon, Feb 2, 2015, at 08:55 AM, Vineet Mishra wrote: >>>>>> Hi Harsha, >>>>>> >>>>>> I am using storm.kafka.KafkaSpout.KafkaSpout implementation from >>>>>> >>>>>> https://github.com/wurstmeister/storm-kafka-0.8-plus >>>>>> >>>>>> Thanks! >>>>>> >>>>>> On Mon, Feb 2, 2015 at 8:14 PM, Harsha wrote: >>>>>>> __ >>>>>>> Vineet, Which kafka spout are you using? >>>>>>> >>>>>>> -Harsha >>>>>>> >>>>>>> >>>>>>> >>>>>>> On Mon, Feb 2, 2015, at 05:25 AM, Vineet Mishra wrote: >>>>>>>> Hi, >>>>>>>> >>>>>>>> I am running Kafka Storm Engine to process real time data >>>>>>>> generated on a 3 node distributed cluster. >>>>>>>> >>>>>>>> Currently I have set 10 Executors for Storm Spout, which I >>>>>>>> don't think is running in parallel. Moreover earlier I was >>>>>>>> running the Kafka Topology with Replication Factor and >>>>>>>> Partitions as 1(which seems to have run comparatively faster), >>>>>>>> now I gave the Replication Factor as 3 and Partitions as 10 and >>>>>>>> I could see the performance degradation. >>>>>>>> >>>>>>>> Is there any way I can max utilize the available resource and >>>>>>>> get the max throughput of event processing. >>>>>>>> >>>>>>>> Looking for the expert suggestions at URGENT. >>>>>>>> >>>>>>>> Thanks! >>>>>>> >>>>>> >>>>> >>>> >>> >> > --_----------=_142297483537513261 Content-Transfer-Encoding: 7bit Content-Type: text/html
Vineet,
         In kafka producer.send(KeyedMessage<Id, Message>) are you passing in a ID. If this is constant or null your data won't be distributed to all partitions. In case of constant Id all of your messages goes to same partition and incase of null it chooses round-robin to distribute among partitions. Its better to use a random UUID to distribute among all of your partitions.
-Harsha
 
 
On Tue, Feb 3, 2015, at 12:44 AM, Vineet Mishra wrote:
Do you mean to say that the event published to Kafka is not partition distributed?
 
Well while creating the topic I ensured to create # of partitions as 10 and replication factor as 3.
 
Is it something effected as how I am writing to Kafka?
 
Thanks!
 
On Tue, Feb 3, 2015 at 1:50 PM, Andrew Neilson <arsneilson@gmail.com> wrote:
The behaviour you are describing sounds like your topology is processing a small backlog of events built up in each partition and then catching up to realtime where events are only being published to one of the 10 partitions at a time. I will echo Harsha in suggesting that you verify you are actually publishing to all partitions (important: this is not the default behaviour).
 
On Tue, Feb 3, 2015 at 12:05 AM, Vineet Mishra <clearmidoubt@gmail.com> wrote:
Hi Harsha,
 
Based on the proposed metric, I ensured the specified changes by changing the Kafka-Storm Version bundle.
 
Although I could see the difference from the last bundle used to the current change but was not satisfied by the way Spouts were processing. The observation which I had was,
 
The Spout were running with Executor counts as 10, while initiating the job around half of the executors(5) started processing in parallel to ingest the data.
 
As soon as the counts reached around a million or so the state of parallelism dropped and eventually it started processing in serially(One Executor at a time).
 
Executors (All time)
IdUptimeHostPortEmittedTransferredComplete latency (ms)AckedFailed
[2-2]13m 54shost36703000.00000
[3-3]13m 52shost267023183003183004.7893181600
[4-4]13m 52shost367024342004342007.0644343800
[5-5]13m 53shost2670120200.00000
[6-6]13m 55shost36701000.00000
[7-7]13m 51shost2670025000250004.122245000
[8-8]13m 51shost367002483602483609.5142457800
[9-9]13m 52shost26703000.00000
[10-10]13m 54shost367032352202352209.2502332000
[11-11]13m 52shost2670220442020442010.3822058000
 
I am having around .2 Billion Events ingested to Kafka which needs to be processed through Storm in Real time but I am not sure what is making this unexpected intermittent behavior of the storm and how can I prevent this in near future.
 
Expecting Expert Suggestions.
 
Thanks!
 
 
 
On Mon, Feb 2, 2015 at 11:53 PM, Vineet Mishra <clearmidoubt@gmail.com> wrote:
Well I am already running Kafka with 10 Partitions and Replication factor as 3 which is the default size of my cluster.
 
bin/kafka-topics.sh --create --zookeeper host1:2181,host2:2181,host3:2181 --replication-factor 3 --partitions 10 --topic test
 
and I am also running Kafka Storm topology with Executors count as 10 
 
TopologyBuilder builder=new TopologyBuilder();
        builder.setSpout("KafkaSpout", new KafkaSpout(kafkaConfig), 10);
 
I am having a notion that since the time I have started running Kafka from last* changed RF and # of Partitions I am landing up with latency.
 
* bin/kafka-topics.sh --create --zookeeper host1:2181,host2:2181,host3:2181 --replication-factor 1 --partitions 1 --topic test
 
Well I will try with above provided Storm Kafka bundle. Hope that could help out!
 
Thanks!
 
On Mon, Feb 2, 2015 at 10:30 PM, Harsha <storm@harsha.io> wrote:

Vineet,
       Can you try using the one in storm https://github.com/apache/storm/tree/master/external/storm-kafka . This is published into maven repo. So you can use the following
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-kafka</artifactId>
<version>0.9.3</version>
</dependency>
 
If you are using topic with partitions size 10 make sure you configured your kafka spout with parallelism set to 10. Also make sure on the producer side you are pushing data onto all of the 10 partitions so that your kafka spout is fetching data from all of the 10 partitions.
 
-Harsha
 
 
 
On Mon, Feb 2, 2015, at 08:55 AM, Vineet Mishra wrote:
Hi Harsha,
 
I am using storm.kafka.KafkaSpout.KafkaSpout implementation from
 
 
Thanks!
 
On Mon, Feb 2, 2015 at 8:14 PM, Harsha <storm@harsha.io> wrote:

Vineet,
        Which kafka spout are you using?
 
-Harsha
 
 
 
On Mon, Feb 2, 2015, at 05:25 AM, Vineet Mishra wrote:
Hi,
 
I am running Kafka Storm Engine to process real time data generated on a 3 node distributed cluster.
 
Currently I have set 10 Executors for Storm Spout, which I don't think is running in parallel.
Moreover earlier I was running the Kafka Topology with Replication Factor and Partitions as 1(which seems to have run comparatively faster), now I gave the Replication Factor as 3 and Partitions as 10 and I could see the performance degradation.
 
Is there any way I can max utilize the available resource and get the max throughput of event processing.
 
Looking for the expert suggestions at URGENT.
 
Thanks!
 
 
 
 
 
 
 
 
--_----------=_142297483537513261--