Return-Path: X-Original-To: apmail-storm-user-archive@minotaur.apache.org Delivered-To: apmail-storm-user-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 4085811687 for ; Tue, 8 Jul 2014 16:35:12 +0000 (UTC) Received: (qmail 65818 invoked by uid 500); 8 Jul 2014 16:35:11 -0000 Delivered-To: apmail-storm-user-archive@storm.apache.org Received: (qmail 65778 invoked by uid 500); 8 Jul 2014 16:35:11 -0000 Mailing-List: contact user-help@storm.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: user@storm.incubator.apache.org Delivered-To: mailing list user@storm.incubator.apache.org Received: (qmail 65768 invoked by uid 99); 8 Jul 2014 16:35:11 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 08 Jul 2014 16:35:11 +0000 X-ASF-Spam-Status: No, hits=1.5 required=5.0 tests=AC_DIV_BONANZA,HTML_MESSAGE,RCVD_IN_DNSWL_LOW,SPF_PASS X-Spam-Check-By: apache.org Received-SPF: pass (athena.apache.org: domain of milos.solujic@gmail.com designates 74.125.82.175 as permitted sender) Received: from [74.125.82.175] (HELO mail-we0-f175.google.com) (74.125.82.175) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 08 Jul 2014 16:35:07 +0000 Received: by mail-we0-f175.google.com with SMTP id k48so6144654wev.20 for ; Tue, 08 Jul 2014 09:34:46 -0700 (PDT) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=gmail.com; s=20120113; h=mime-version:in-reply-to:references:from:date:message-id:subject:to :content-type; bh=WpWTB26+Wwz3nQ3Of+gz4DKW6/f8H8UTnpO2uSEeNUQ=; b=si/Xt8/QHDKbB7voXI1zX1Fqy+HWTDIHOMPfIHWOKN7UZNDDwnf38Ut7XzUmN8ydvw 7sgM2WjrMnP/WyNrvmlOPwcbf5Mg5OGAZTEW0lsXQxk0F1X52uOrdnM2iYsRoIJZn4Pg 6EWxRvvMhUTXdAzzRCJn5B35Z9yuZ+k1t/WcRPejxZGvSjhGLy5YVf2H2Nj+DX/cVUXM 7flXCSPL1Qd/MNV3CL1KKLmmphFzR3mTQfCITFqRM1+sEnJwSQUJhyzjnUYgUqqV5yiz Wtte3Ew10Z3rRmkRxa6wrNPbDUvStZ9kvy1Im6qHc+5OLx2fqctplj8pyChG8K7FOoEq OITA== X-Received: by 10.181.11.232 with SMTP id el8mr5203726wid.57.1404837286088; Tue, 08 Jul 2014 09:34:46 -0700 (PDT) MIME-Version: 1.0 Received: by 10.194.125.197 with HTTP; Tue, 8 Jul 2014 09:34:26 -0700 (PDT) In-Reply-To: References: From: =?UTF-8?B?TWlsb8WhIFNvbHVqacSH?= Date: Tue, 8 Jul 2014 18:34:26 +0200 Message-ID: Subject: Re: Kafka trident getting stuck To: user@storm.incubator.apache.org Content-Type: multipart/alternative; boundary=f46d043c80726798a204fdb12b02 X-Virus-Checked: Checked by ClamAV on apache.org --f46d043c80726798a204fdb12b02 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: quoted-printable Yep. pretty much sure. Via internal kafka-producer.sh same method is used to produce initial messages (before first launch of topology, that got consumed and processed just fine) as for maxSpoutPending first I tried with 10, than removed it (left default value) On Tue, Jul 8, 2014 at 6:31 PM, Danijel Schiavuzzi wrote: > Are you sure you are producing new messages into the same Kafka > topic? What number did you set maxSpoutPending to? > > On Tuesday, July 8, 2014, Milo=C5=A1 Soluji=C4=87 wrote: > >> Thanks Danijel for your quick proposition. >> >> I tried lowering down and removing all performance settings (those were >> left from load testing on one machine) >> >> Still same result: no matter what, new messages are not taken from kafka >> after topology is redeployed. >> >> >> On Tue, Jul 8, 2014 at 6:15 PM, Danijel Schiavuzzi < >> danijel@schiavuzzi.com> wrote: >> >>> Try lowering setMaxSpoutPending(100000) to a much lower value (like 10)= . >>> In Trident, setMaxSpoutPending referns to the number of batches, not tu= ples >>> like in plain Storm. Too high values may cause blockages like the one y= ou >>> describe. >>> >>> >>> On Tuesday, July 8, 2014, Milo=C5=A1 Soluji=C4=87 wrote: >>> >>>> Hi all, >>>> >>>> I'm pretty new to storm and kafka/zookeeper, and I hope that my >>>> question is not to dumb. Here it goes: >>>> >>>> I'm using latest stable storm and storm-kafka =3D 0.9.2-incubating >>>> >>>> I've setup test cluster using wirbelsturm tool with unchanged yaml >>>> (just uncommented kafka machine) >>>> >>>> here is config snippet for my trident topology: >>>> >>>> BrokerHosts zk =3D new ZkHosts("zookeeper1"); >>>> TridentKafkaConfig kafkaConf =3D new TridentKafkaConfig(zk, >>>> "scores"); >>>> >>>> kafkaConf.scheme =3D new SchemeAsMultiScheme(new StringScheme(= )); >>>> kafkaConf.fetchSizeBytes =3D 10000; >>>> kafkaConf.forceFromStart =3D true; >>>> >>>> Config stormConfig =3D new Config(); >>>> stormConfig.put(Config.NIMBUS_HOST, NIMBUS_IP); >>>> stormConfig.put("couchbase.ip", COUCHBASE_CONSOLE); >>>> stormConfig.put("couchbase.bucket", COUCHBASE_BUCKET); >>>> stormConfig.put("couchbase.password", COUCHBASE_PASSWORD); >>>> // performance settings >>>> >>>> stormConfig.put(Config.TOPOLOGY_TRIDENT_BATCH_EMIT_INTERVAL_MILLIS, 10= 0); >>>> stormConfig.put(RichSpoutBatchExecutor.MAX_BATCH_SIZE_CONF, >>>> 100); >>>> stormConfig.setMaxSpoutPending(100000); >>>> >>>> >>>> if (args !=3D null && args.length > 0) { >>>> >>>> StormSubmitter.submitTopologyWithProgressBar(args[0], >>>> stormConfig, >>>> BuildTridentScoreTopology.build(kafkaConf)); >>>> } else {...} >>>> >>>> Now, I've created 'scores' topic in kafka and pushed few test messages >>>> prior to starting topology, with kafkaConf.forceFromStart =3D true. An= d >>>> topology processed those messages just fine, and stored them in >>>> tridentState (couhbase) >>>> >>>> All new messages are simply ignored! >>>> >>>> After redeploying topology (both with forceFromStart =3D true and >>>> forceFromStart =3D false) no more messages are ingested from kafka. >>>> >>>> here is worker log for one topology deployment and short run >>>> http://pastie.org/private/4xsk6pijvmulwrcg7zgca >>>> >>>> those are VMs that host this storm cluster >>>> 10.0.0.241 zookeeper1 >>>> 10.0.0.101 supervisor1 >>>> 10.0.0.21 kafka1 >>>> 10.0.0.251 nimbus1 >>>> >>>> Thanks, >>>> Milos >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>> >>> -- >>> Danijel Schiavuzzi >>> >>> E: danijel@schiavuzzi.com >>> W: www.schiavuzzi.com >>> T: +385989035562 >>> Skype: danijels7 >>> >> >> > > -- > Danijel Schiavuzzi > > E: danijel@schiavuzzi.com > W: www.schiavuzzi.com > T: +385989035562 > Skype: danijels7 > --f46d043c80726798a204fdb12b02 Content-Type: text/html; charset=UTF-8 Content-Transfer-Encoding: quoted-printable
Yep. pretty much sure. Via internal kafka-produc= er.sh
same method is used to produce initial messages (before fir= st launch of topology, that got consumed and processed just fine)

as for maxSpoutPending first I tried with 10, than removed it (left default= value)


On Tue, Jul 8, 2014 at 6:31 PM, Danijel Schiavuzzi &l= t;danijel@schia= vuzzi.com> wrote:
Are you sure you are producing new messages = into the same=C2=A0Kafka topic?=C2=A0What number did you set maxSpoutPendin= g to?

On Tuesday, July 8, 2014, Milo=C5=A1 Soluji=C4= =87 <milos.= solujic@gmail.com> wrote:
Thanks Danijel fo= r your quick proposition.

I tried lowering down and removing = all performance settings (those were left from load testing on one machine)=

Still same result: no matter what, new messages are not taken fro= m kafka after topology is redeployed.


<= div class=3D"gmail_quote">On Tue, Jul 8, 2014 at 6:15 PM, Danijel Schiavuzz= i <danijel@schiavuzzi.com> wrote:
Try lowering setMaxSpoutPending(100000) to a= much lower value (like 10). In Trident, setMaxSpoutPending referns to the = number of batches, not tuples like in plain Storm. Too high values may caus= e blockages like the one you describe.


On Tuesday, July 8, 2014, Milo=C5=A1 Soluji=C4=87 <milos.solujic@= gmail.com> wrote:
Hi all, =

I'm pretty new to storm and kafka/zookeeper, and I hope t= hat my question is not to dumb. Here it goes:

I'm using la= test stable storm and storm-kafka =3D 0.9.2-incubating

I've setup test cluster using wirbelsturm tool with unchanged= yaml (just uncommented kafka machine)

here is config snippet = for my trident topology:

=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0 BrokerHosts zk =3D new Zk= Hosts("zookeeper1");
=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0 TridentKafkaConfig kafkaConf =3D= new TridentKafkaConfig(zk, "scores");=C2=A0=C2=A0=C2=A0=C2=A0=C2= =A0=C2=A0=C2=A0
=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0
=C2=A0= =C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0 kafkaConf.scheme =3D new SchemeAsMulti= Scheme(new StringScheme());
=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0 k= afkaConf.fetchSizeBytes =3D 10000;
=C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 kafkaConf.forceFromStart =3D true;
=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0 Config stormConfig =3D new= Config();
=C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 stormConfig.put(Config.= NIMBUS_HOST, NIMBUS_IP);
=C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 stormConf= ig.put("couchbase.ip", COUCHBASE_CONSOLE);
=C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 stormConfig.put("couchbase.bucke= t", COUCHBASE_BUCKET);
=C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 storm= Config.put("couchbase.password", COUCHBASE_PASSWORD);
=C2=A0= =C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 // performance settings
=C2=A0=C2=A0=C2= =A0 =C2=A0=C2=A0=C2=A0 stormConfig.put(Config.TOPOLOGY_TRIDENT_BATCH_EMIT_I= NTERVAL_MILLIS, 100);
=C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 stormConfig.put(RichSpoutBatchExecuto= r.MAX_BATCH_SIZE_CONF, 100);
=C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 storm= Config.setMaxSpoutPending(100000);


=C2=A0=C2=A0=C2=A0 =C2=A0=C2= =A0=C2=A0 if (args !=3D null && args.length > 0) {

=C2=A0= =C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 StormSubmitter.submitTop= ologyWithProgressBar(args[0], stormConfig,
=C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0= =C2=A0=C2=A0=C2=A0 BuildTridentScoreTopology.build(kafkaConf));
=C2=A0= =C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 } else {...}


Now, I'= ;ve created 'scores' topic in kafka and pushed few test messages pr= ior to starting topology, with kafkaConf.forceFromStart =3D true. And topo= logy processed those messages just fine, and stored them in tridentState (c= ouhbase)

All new messages are simply ignored!

After redeployi= ng topology (both with forceFromStart =3D true and forceFromStart =3D false= ) no more messages are ingested from kafka.

here is worker lo= g for one topology deployment and short run http://pastie.org/private/4x= sk6pijvmulwrcg7zgca

those are VMs that host this storm cluster
10.0.0.241 zookeepe= r1
10.0.0.101 supervisor1
10.0.0.21 kafka1
10.0.0.251 nimbus1
<= br>
Thanks,
Milos




=C2=A0




--
Danij= el Schiavuzzi

E: danijel@schiavuzzi.com
W: www.schiavuzzi.com
T: +3859= 89035562
Skype: danijels7




--
Danijel Schiavuzzi

E: danijel@schiavuzzi.com
= W: www.schiavuzzi.com
T: +3859= 89035562
Skype: danijels7


--f46d043c80726798a204fdb12b02--