From user-return-3178-apmail-storm-user-archive=storm.apache.org@storm.incubator.apache.org Thu Jul 10 12:08:34 2014 Return-Path: X-Original-To: apmail-storm-user-archive@minotaur.apache.org Delivered-To: apmail-storm-user-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 62172111C6 for ; Thu, 10 Jul 2014 12:08:34 +0000 (UTC) Received: (qmail 24508 invoked by uid 500); 10 Jul 2014 12:08:33 -0000 Delivered-To: apmail-storm-user-archive@storm.apache.org Received: (qmail 24463 invoked by uid 500); 10 Jul 2014 12:08:33 -0000 Mailing-List: contact user-help@storm.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: user@storm.incubator.apache.org Delivered-To: mailing list user@storm.incubator.apache.org Received: (qmail 24453 invoked by uid 99); 10 Jul 2014 12:08:33 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 10 Jul 2014 12:08:33 +0000 X-ASF-Spam-Status: No, hits=1.5 required=5.0 tests=HTML_MESSAGE,RCVD_IN_DNSWL_LOW,SPF_PASS,WEIRD_PORT X-Spam-Check-By: apache.org Received-SPF: pass (athena.apache.org: domain of milos.solujic@gmail.com designates 209.85.212.171 as permitted sender) Received: from [209.85.212.171] (HELO mail-wi0-f171.google.com) (209.85.212.171) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 10 Jul 2014 12:08:29 +0000 Received: by mail-wi0-f171.google.com with SMTP id f8so3928664wiw.4 for ; Thu, 10 Jul 2014 05:08:07 -0700 (PDT) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=gmail.com; s=20120113; h=mime-version:in-reply-to:references:date:message-id:subject:from:to :content-type; bh=op8UYzu8ph/ZEOTtjJuMfx/Y2yfXsY8tYWjw2MJ5FkI=; b=QfNXm4uNPbgCgL0DqFbN0jAShl5DauWMDsV2PCFYWQrClX1eAvkxX7OFWl2jh20n+h A4zWhJbBwmNs3zGrF+No00JGikXKLvr5FJta5nYYdpYtCzfi9FvOqXTlwoai3W7416dm Y2u7ro96u9r8j2xxh9YTyFCPVdjJ35T08frcPPBCF4eKnHApzEVZ6c4tg2n+tVlOXOFQ /P5dO/v/dgIKQasnDtu+zMZcAAKRNoFvebKrdwQ5FuIVPYgBvoLCMfS8UCvyIo8dyH10 smhJ4AcvdNsA/mbHLlEgBlB23R9WHgGs51Y7tyhpf22a4jOOstgCsOZ15+QyaDB15XZO 8zIA== MIME-Version: 1.0 X-Received: by 10.181.13.137 with SMTP id ey9mr19002872wid.57.1404994087294; Thu, 10 Jul 2014 05:08:07 -0700 (PDT) Received: by 10.194.125.197 with HTTP; Thu, 10 Jul 2014 05:08:07 -0700 (PDT) Received: by 10.194.125.197 with HTTP; Thu, 10 Jul 2014 05:08:07 -0700 (PDT) In-Reply-To: References: Date: Thu, 10 Jul 2014 14:08:07 +0200 Message-ID: Subject: Re: Kafka trident getting stuck From: =?UTF-8?B?TWlsb8WhIFNvbHVqacSH?= To: user@storm.incubator.apache.org Content-Type: multipart/alternative; boundary=f46d0438eb9b7cef2704fdd5ad7a X-Virus-Checked: Checked by ClamAV on apache.org --f46d0438eb9b7cef2704fdd5ad7a Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: quoted-printable Yes On 10 Jul 2014 14:03, "Danijel Schiavuzzi" wrote: > Did you kill your topology before clearing the Zookeeper data? > > On Jul 10, 2014 1:24 PM, "Milo=C5=A1 Soluji=C4=87" wrote: > > > > Thanks Danijel for taking interest in my problem. > > > > Exactly same feeling I've got (that zookeeper data is corrupted) So I > purged info about it via zkCli.sh > > > > Now I've got some lower level issues: > > > > 2014-07-10 11:00:13 b.s.d.worker [INFO] Worker > 04a17a6b-5aea-47ce-808b-218c4bcc1d51 for storm > tridentOpaqueTest-topology-3-1404989752 on > e4cdd619-c7e4-40f0-941e-352ac41daf6e:6701 has finished loading > > 2014-07-10 11:00:14 s.k.DynamicBrokersReader [INFO] Read partition info > from zookeeper: GlobalPartitionInformation{partitionMap=3D{0=3Dlocalhost:= 9092}} > > 2014-07-10 11:00:14 o.a.c.f.i.CuratorFrameworkImpl [INFO] Starting > > 2014-07-10 11:00:14 o.a.z.ZooKeeper [INFO] Initiating client connection= , > connectString=3Dzookeeper1:2181 sessionTimeout=3D20000 > watcher=3Dorg.apache.curator.ConnectionState@203c6f3e > > 2014-07-10 11:00:14 o.a.z.ClientCnxn [INFO] Opening socket connection t= o > server zookeeper1/10.0.0.241:2181. Will not attempt to authenticate using > SASL (unknown error) > > 2014-07-10 11:00:14 o.a.z.ClientCnxn [INFO] Socket connection > established to zookeeper1/10.0.0.241:2181, initiating session > > 2014-07-10 11:00:14 o.a.z.ClientCnxn [INFO] Session establishment > complete on server zookeeper1/10.0.0.241:2181, sessionid =3D > 0x1471fbc929c00f4, negotiated timeout =3D 20000 > > 2014-07-10 11:00:14 o.a.c.f.s.ConnectionStateManager [INFO] State > change: CONNECTED > > 2014-07-10 11:00:14 o.a.c.f.s.ConnectionStateManager [WARN] There are n= o > ConnectionStateListeners registered. > > 2014-07-10 11:00:15 o.a.z.ZooKeeper [INFO] Session: 0x1471fbc929c00f4 > closed > > 2014-07-10 11:00:15 o.a.c.f.i.CuratorFrameworkImpl [INFO] Starting > > 2014-07-10 11:00:15 o.a.z.ClientCnxn [INFO] EventThread shut down > > 2014-07-10 11:00:15 o.a.z.ZooKeeper [INFO] Initiating client connection= , > connectString=3Dzookeeper1:2181/transactional/tridentTestOpaqueSpout/user > sessionTimeout=3D20000 watcher=3Dorg.apache.curator.ConnectionState@4fa > > 983e > > 2014-07-10 11:00:15 o.a.z.ClientCnxn [INFO] Opening socket connection t= o > server zookeeper1/10.0.0.241:2181. Will not attempt to authenticate using > SASL (unknown error) > > 2014-07-10 11:00:15 o.a.z.ClientCnxn [INFO] Socket connection > established to zookeeper1/10.0.0.241:2181, initiating session > > 2014-07-10 11:00:15 b.s.d.executor [INFO] Prepared bolt spout0:(5) > > 2014-07-10 11:00:15 o.a.z.ClientCnxn [INFO] Session establishment > complete on server zookeeper1/10.0.0.241:2181, sessionid =3D > 0x1471fbc929c00f5, negotiated timeout =3D 20000 > > 2014-07-10 11:00:15 o.a.c.f.s.ConnectionStateManager [INFO] State > change: CONNECTED > > 2014-07-10 11:00:15 o.a.c.f.s.ConnectionStateManager [WARN] There are n= o > ConnectionStateListeners registered. > > 2014-07-10 11:00:16 b.s.util [ERROR] Async loop died! > > java.lang.RuntimeException: java.net.ConnectException: Connection refus= ed > > at > backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.j= ava:128) > ~[storm-core-0.9.2-incubating.jar:0.9.2-incubating] > > at > backtype.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQu= eue.java:99) > ~[storm-core-0.9.2-incubating.jar:0.9.2-incubating] > > at > backtype.storm.disruptor$consume_batch_when_available.invoke(disruptor.cl= j:80) > ~[storm-core-0.9.2-incubating.jar:0.9.2-incubating] > > at > backtype.storm.daemon.executor$fn__5641$fn__5653$fn__5700.invoke(executor= .clj:746) > ~[storm-core-0.9.2-incubating.jar:0.9.2-incubating] > > at backtype.storm.util$async_loop$fn__457.invoke(util.clj:431) > ~[storm-core-0.9.2-incubating.jar:0.9.2-incubating] > > at clojure.lang.AFn.run(AFn.java:24) ~[clojure-1.5.1.jar:na] > > at java.lang.Thread.run(Thread.java:701) ~[na:1.6.0_30] > > Caused by: java.net.ConnectException: Connection refused > > at sun.nio.ch.Net.connect(Native Method) ~[na:1.6.0_30] > > at > sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:534) > ~[na:1.6.0_30] > > at > kafka.network.BlockingChannel.connect(BlockingChannel.scala:57) > ~[stormjar.jar:na] > > at > kafka.consumer.SimpleConsumer.connect(SimpleConsumer.scala:44) > ~[stormjar.jar:na] > > at > kafka.consumer.SimpleConsumer.getOrMakeConnection(SimpleConsumer.scala:14= 2) > ~[stormjar.jar:na] > > at > kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(= SimpleConsumer.scala:69) > ~[stormjar.jar:na] > > at > kafka.consumer.SimpleConsumer.getOffsetsBefore(SimpleConsumer.scala:124) > ~[stormjar.jar:na] > > at > kafka.javaapi.consumer.SimpleConsumer.getOffsetsBefore(SimpleConsumer.sca= la:79) > ~[stormjar.jar:na] > > at storm.kafka.KafkaUtils.getOffset(KafkaUtils.java:77) > ~[stormjar.jar:na] > > at storm.kafka.KafkaUtils.getOffset(KafkaUtils.java:67) > ~[stormjar.jar:na] > > at > storm.kafka.trident.TridentKafkaEmitter.doEmitNewPartitionBatch(TridentKa= fkaEmitter.java:111) > ~[stormjar.jar:na] > > at > storm.kafka.trident.TridentKafkaEmitter.failFastEmitNewPartitionBatch(Tri= dentKafkaEmitter.java:72) > ~[stormjar.jar:na] > > at > storm.kafka.trident.TridentKafkaEmitter.emitNewPartitionBatch(TridentKafk= aEmitter.java:79) > ~[stormjar.jar:na] > > at > storm.kafka.trident.TridentKafkaEmitter.access$000(TridentKafkaEmitter.ja= va:46) > ~[stormjar.jar:na] > > at > storm.kafka.trident.TridentKafkaEmitter$1.emitPartitionBatch(TridentKafka= Emitter.java:204) > ~[stormjar.jar:na] > > at > storm.kafka.trident.TridentKafkaEmitter$1.emitPartitionBatch(TridentKafka= Emitter.java:194) > ~[stormjar.jar:na] > > at > storm.trident.spout.OpaquePartitionedTridentSpoutExecutor$Emitter.emitBat= ch(OpaquePartitionedTridentSpoutExecutor.java:127) > ~[storm-core-0.9.2-incubating.jar:0.9.2-incubating] > > at > storm.trident.spout.TridentSpoutExecutor.execute(TridentSpoutExecutor.jav= a:82) > ~[storm-core-0.9.2-incubating.jar:0.9.2-incubating] > > at > storm.trident.topology.TridentBoltExecutor.execute(TridentBoltExecutor.ja= va:369) > ~[storm-core-0.9.2-incubating.jar:0.9.2-incubating] > > at > backtype.storm.daemon.executor$fn__5641$tuple_action_fn__5643.invoke(exec= utor.clj:631) > ~[storm-core-0.9.2-incubating.jar:0.9.2-incubating] > > at > backtype.storm.daemon.executor$mk_task_receiver$fn__5564.invoke(executor.= clj:399) > ~[storm-core-0.9.2-incubating.jar:0.9.2-incubating] > > at > backtype.storm.disruptor$clojure_handler$reify__745.onEvent(disruptor.clj= :58) > ~[storm-core-0.9.2-incubating.jar:0.9.2-incubating] > > at > backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.j= ava:120) > ~[storm-core-0.9.2-incubating.jar:0.9.2-incubating] > > ... 6 common frames omitted > > 2014-07-10 11:00:16 b.s.d.executor [ERROR] > > > > I've specified in simple test topology that I want only one worker, so > what happens after this error, supervisor will boot up another woorker, > which fails with same error after ~ minute of running. > > > > What is this about and how to fix this problem? (setup is the same, > default wirbelsturm cluster running on ubuntu 64b machine) > > > > btw, here is new test topology, using opaque kafka spout (same thing > happens with transactional too) > > > > > > public class TridentKafkaOpaqueDeployer { > > > > > > public static class PrinterBolt extends BaseFunction { > > private static final long serialVersionUID =3D > -5585127152942983256L; > > > > @Override > > public void execute(TridentTuple tuple, TridentCollector > tridentCollector) { > > System.out.println(tuple.toString()); > > } > > } > > > > public static void main(String[] args) throws Exception { > > > > BrokerHosts brokerHosts =3D new ZkHosts("zookeeper1"); > > TridentKafkaConfig kafkaConfig =3D new > TridentKafkaConfig(brokerHosts, "tridentOpaqueTest"); > > > > kafkaConfig.scheme =3D new SchemeAsMultiScheme(new StringScheme= ()); > > // kafkaConfig.forceFromStart =3D true; > > > > > > TridentTopology topology =3D new TridentTopology(); > > > > topology > > .newStream("tridentTestOpaqueSpout", new > OpaqueTridentKafkaSpout(kafkaConfig)) > > .name("tridentTestOpaqueSpout") > > > > .each(new Fields("str"), new PrinterBolt(), > > new Fields("print")); > > > > Config config =3D new Config(); > > config.put(Config.TOPOLOGY_TRIDENT_BATCH_EMIT_INTERVAL_MILLIS, > 2000); > > config.setNumWorkers(1); > > config.setMaxSpoutPending(3); > > > > StormSubmitter.submitTopologyWithProgressBar(args[0], config, > topology.build()); > > } > > } > > > > > > // note > > > > I also resolved issue with reusing spout name across topologies, as per > recommendation that Nathan gave some time ago > > https://groups.google.com/forum/#!topic/storm-user/Tn43K1eGcKY > > > > > > > > > > On Wed, Jul 9, 2014 at 9:49 PM, Danijel Schiavuzzi < > danijel@schiavuzzi.com> wrote: > >> > >> Very strange. Could you try deleting Trident's data in Zookeeper: > >> > >> $ sh zkCli.sh > >> rmr /transactional > >> > >> and then resubmitting the topology and repeating your test scenario? > >> > >> Maybe the the spout's data in Zookeeper got somehow corrupted because > you are setting forceFromStart in the spout, and resubmitting the topolog= y > multiple times. I think the transactional topology may be left in an > undefined state that case. > >> > >> You could also enable the LoggingMetricsConsumer in storm.yaml, and > then check the Kafka spout's kafka.latestOffset metric in metrics.log, an= d > compare this offset with the one Kafka's own utility script outputs > (search under kafka/bin/ for the script). > >> > >> On Wednesday, July 9, 2014, Milo=C5=A1 Soluji=C4=87 > wrote: > >>> > >>> Yep, I did double checked. > >>> > >>> Here is how it's done: > >>> > >>> #create topic > >>> /opt/kafka/bin/kafka-topics.sh --create --zookeeper zookeeper1:2181 > --replication-factor 1 --partition 1 --topic scores > >>> > >>> #check what is created > >>> /opt/kafka/bin/kafka-topics.sh --zookeeper zookeeper1:2181 --describe > --topic scores > >>> > >>> #produce few messages > >>> /opt/kafka/bin/kafka-console-producer.sh --broker-list localhost:9092 > --topic scores > >>> > >>> #consumer > >>> /opt/kafka/bin/kafka-console-consumer.sh --zookeeper zookeeper1:2181 > --topic scores --from-beginning > >>> > >>> > >>> > >>> > >>> > try enabling Config.setDebug(true) and monitoring the Kafka spout's > activity in the logs. > >>> did that, only tick touples are shipped around, nothing else > >>> > >>> > Also, you should paste all your worker logs (worker-*.log files). > >>> Forgot to mention, only one worker is set, exactly for reason to > simplify things. > >>> > >>> > >>> > >>> Here is simplified version of this topology (no trident state, only > simple printer bolt) > >>> > >>> > >>> public class TridentKafkaDeployer { > >>> > >>> public static class PrinterBolt extends BaseFunction { > >>> private static final long serialVersionUID =3D > -5585127152942983256L; > >>> > >>> @Override > >>> public void execute(TridentTuple tuple, TridentCollector > tridentCollector) { > >>> System.out.println(tuple.toString()); > >>> } > >>> } > >>> > >>> public static void main(String[] args) throws Exception { > >>> > >>> BrokerHosts zk =3D new ZkHosts("zookeeper1"); > >>> TridentKafkaConfig kafkaConfig =3D new TridentKafkaConfig(zk, > "scores"); > >>> > >>> kafkaConfig.scheme =3D new SchemeAsMultiScheme(new > StringScheme()); > >>> kafkaConfig.forceFromStart =3D true; > >>> > >>> TridentTopology topology =3D new TridentTopology(); > >>> > >>> topology > >>> .newStream("raw-scores", new > TransactionalTridentKafkaSpout(kafkaConfig)) > >>> .name("kafkaSpout") > >>> .each(new Fields("str"), new PrinterBolt(), > >>> new Fields("print")); > >>> > >>> > >>> Config config =3D new Config(); > >>> config.put(Config.TOPOLOGY_TRIDENT_BATCH_EMIT_INTERVAL_MILLIS= , > 2000); > >>> config.setNumWorkers(1); > >>> config.setMaxSpoutPending(3); > >>> > >>> StormSubmitter.submitTopologyWithProgressBar(args[0], config, > topology.build()); > >>> } > >>> } > >>> > >>> Exactly same behaviour (it goes to exactly same kafka topic) =3D no > picking up fresh messages in kafka topic. > >>> > >>> > >>> > >>> > >>> > >>> On Tue, Jul 8, 2014 at 7:08 PM, Danijel Schiavuzzi < > danijel@schiavuzzi.com> wrote: > >>>> > >>>> Also, you should paste all your worker logs (worker-*.log files). > >>>> > >>>> > >>>> On Tuesday, July 8, 2014, Danijel Schiavuzzi > wrote: > >>>>> > >>>>> I'd double check the Kafka producer to make sure those messages are > really getting into the right Kafka topic. Also, > try enabling Config.setDebug(true) and monitoring the Kafka spout's > activity in the logs. setMaxSpoutPending should always be set, as by > default it is unset, so you risk internal queue explosion. > >>>>> > >>>>> On Tuesday, July 8, 2014, Milo=C5=A1 Soluji=C4=87 > wrote: > >>>>>> > >>>>>> Yep. pretty much sure. Via internal kafka-producer.sh > >>>>>> same method is used to produce initial messages (before first > launch of topology, that got consumed and processed just fine) > >>>>>> > >>>>>> as for maxSpoutPending first I tried with 10, than removed it (lef= t > default value) > >>>>>> > >>>>>> > >>>>>> On Tue, Jul 8, 2014 at 6:31 PM, Danijel Schiavuzzi < > danijel@schiavuzzi.com> wrote: > >>>>>>> > >>>>>>> Are you sure you are producing new messages into the same Kafka > topic? What number did you set maxSpoutPending to? > >>>>>>> > >>>>>>> On Tuesday, July 8, 2014, Milo=C5=A1 Soluji=C4=87 > wrote: > >>>>>>>> > >>>>>>>> Thanks Danijel for your quick proposition. > >>>>>>>> > >>>>>>>> I tried lowering down and removing all performance settings > (those were left from load testing on one machine) > >>>>>>>> > >>>>>>>> Still same result: no matter what, new messages are not taken > from kafka after topology is redeployed. > >>>>>>>> > >>>>>>>> > >>>>>>>> On Tue, Jul 8, 2014 at 6:15 PM, Danijel Schiavuzzi < > danijel@schiavuzzi.com> wrote: > >>>>>>>>> > >>>>>>>>> Try lowering setMaxSpoutPending(100000) to a much lower value > (like 10). In Trident, setMaxSpoutPending referns to the number of batche= s, > not tuples like in plain Storm. Too high values may cause blockages like > the one you describe. > >>>>>>>>> > >>>>>>>>> > >>>>>>>>> On Tuesday, July 8, 2014, Milo=C5=A1 Soluji=C4=87 > wrote: > >>>>>>>>>> > >>>>>>>>>> Hi all, > >>>>>>>>>> > >>>>>>>>>> I'm pretty new to storm and kafka/zookeeper, and I hope that m= y > question is not to dumb. Here it goes: > >>>>>>>>>> > >>>>>>>>>> I'm using latest stable storm and storm-kafka =3D > 0.9.2-incubating > >>>>>>>>>> > >>>>>>>>>> I've setup test cluster using wirbelsturm tool with unchanged > yaml (just uncommented kafka machine) > >>>>>>>>>> > >>>>>>>>>> here is config snippet for my trident topology: > >>>>>>>>>> > >>>>>>>>>> BrokerHosts zk =3D new ZkHosts("zookeeper1"); > >>>>>>>>>> TridentKafkaConfig kafkaConf =3D new > TridentKafkaConfig(zk, "scores"); > >>>>>>>>>> > >>>>>>>>>> kafkaConf.scheme =3D new SchemeAsMultiScheme(new > StringScheme()); > >>>>>>>>>> kafkaConf.fetchSizeBytes =3D 10000; > >>>>>>>>>> kafkaConf.forceFromStart =3D true; > >>>>>>>>>> > >>>>>>>>>> Config stormConfig =3D new Config(); > >>>>>>>>>> stormConfig.put(Config.NIMBUS_HOST, NIMBUS_IP); > >>>>>>>>>> stormConfig.put("couchbase.ip", COUCHBASE_CONSOLE); > >>>>>>>>>> stormConfig.put("couchbase.bucket", COUCHBASE_BUCKET); > >>>>>>>>>> stormConfig.put("couchbase.password", > COUCHBASE_PASSWORD); > >>>>>>>>>> // performance settings > >>>>>>>>>> > stormConfig.put(Config.TOPOLOGY_TRIDENT_BATCH_EMIT_INTERVAL_MILLIS, 100); > >>>>>>>>>> > stormConfig.put(RichSpoutBatchExecutor.MAX_BATCH_SIZE_CONF, 100); > >>>>>>>>>> stormConfig.setMaxSpoutPending(100000); > >>>>>>>>>> > >>>>>>>>>> > >>>>>>>>>> if (args !=3D null && args.length > 0) { > >>>>>>>>>> > >>>>>>>>>> > StormSubmitter.submitTopologyWithProgressBar(args[0], stormConfig, > >>>>>>>>>> BuildTridentScoreTopology.build(kafkaConf)= ); > >>>>>>>>>> } else {...} > >>>>>>>>>> > >>>>>>>>>> Now, I've created 'scores' topic in kafka and pushed few test > messages prior to starting topology, with kafkaConf.forceFromStart =3D tr= ue. > And topology processed those messages just fine, and stored them in > tridentState (couhbase) > >>>>>>>>>> > >>>>>>>>>> All new messages are simply ignored! > >>>>>>>>>> > >>>>>>>>>> After redeploying topology (both with forceFromStart =3D true = and > forceFromStart =3D false) no more messages are ingested from kafka. > >>>>>>>>>> > >>>>>>>>>> here is worker log for one topology deployment and short run > http://pastie.org/private/4xsk6pijvmulwrcg7zgca > >>>>>>>>>> > >>>>>>>>>> those are VMs that host this storm cluster > >>>>>>>>>> 10.0.0.241 zookeeper1 > >>>>>>>>>> 10.0.0.101 supervisor1 > >>>>>>>>>> 10.0.0.21 kafka1 > >>>>>>>>>> 10.0.0.251 nimbus1 > >>>>>>>>>> > >>>>>>>>>> Thanks, > >>>>>>>>>> Milos > >>>>>>>>>> > >>>>>>>>>> > >>>>>>>>>> > >>>>>>>>>> > >>>>>>>>>> > >>>>>>>>>> > >>>>>>>>>> > >>>>>>>>> > >>>>>>>>> > >>>>>>>>> -- > >>>>>>>>> Danijel Schiavuzzi > >>>>>>>>> > >>>>>>>>> E: danijel@schiavuzzi.com > >>>>>>>>> W: www.schiavuzzi.com > >>>>>>>>> T: +385989035562 > >>>>>>>>> Skype: danijels7 > >>>>>>>> > >>>>>>>> > >>>>>>> > >>>>>>> > >>>>>>> -- > >>>>>>> Danijel Schiavuzzi > >>>>>>> > >>>>>>> E: danijel@schiavuzzi.com > >>>>>>> W: www.schiavuzzi.com > >>>>>>> T: +385989035562 > >>>>>>> Skype: danijels7 > >>>>>> > >>>>>> > >>>>> > >>>>> > >>>>> -- > >>>>> Danijel Schiavuzzi > >>>>> > >>>>> E: danijel@schiavuzzi.com > >>>>> W: www.schiavuzzi.com > >>>>> T: +385989035562 > >>>>> Skype: danijels7 > >>>> > >>>> > >>>> > >>>> -- > >>>> Danijel Schiavuzzi > >>>> > >>>> E: danijel@schiavuzzi.com > >>>> W: www.schiavuzzi.com > >>>> T: +385989035562 > >>>> Skype: danijels7 > >>> > >>> > >> > >> > >> -- > >> Danijel Schiavuzzi > >> > >> E: danijel@schiavuzzi.com > >> W: www.schiavuzzi.com > >> T: +385989035562 > >> Skype: danijels7 > > > > > --f46d0438eb9b7cef2704fdd5ad7a Content-Type: text/html; charset=UTF-8 Content-Transfer-Encoding: quoted-printable

Yes

On 10 Jul 2014 14:03, "Danijel Schiavuzzi&q= uot; <danijel@schiavuzzi.com> wrote:

Did you kill your topology before clearing the Zookeeper dat= a?

On Jul 10, 2014 1:24 PM, "Milo=C5=A1 Soluji=C4=87"= <milos.sol= ujic@gmail.com> wrote:
>
> Thanks Danijel for taking interest in my problem.
>
> Exactly same feeling I've got (that zookeeper data is corrupted) S= o I purged info about it via zkCli.sh
>
> Now I've got some lower level issues:
>
> 2014-07-10 11:00:13 b.s.d.worker [INFO] Worker 04a17a6b-5aea-47ce-808b= -218c4bcc1d51 for storm tridentOpaqueTest-topology-3-1404989752 on e4cdd619= -c7e4-40f0-941e-352ac41daf6e:6701 has finished loading
> 2014-07-10 11:00:14 s.k.DynamicBrokersReader [INFO] Read partition inf= o from zookeeper: GlobalPartitionInformation{partitionMap=3D{0=3Dlocalhost:= 9092}}
> 2014-07-10 11:00:14 o.a.c.f.i.CuratorFrameworkImpl [INFO] Starting
> 2014-07-10 11:00:14 o.a.z.ZooKeeper [INFO] Initiating client connectio= n, connectString=3Dzookeeper1:2181 sessionTimeout=3D20000 watcher=3Dorg.apa= che.curator.ConnectionState@203c6f3e
> 2014-07-10 11:00:14 o.a.z.ClientCnxn [INFO] Opening socket connection = to server zookeeper1/1= 0.0.0.241:2181. Will not attempt to authenticate using SASL (unknown er= ror)
> 2014-07-10 11:00:14 o.a.z.ClientCnxn [INFO] Socket connection establis= hed to zookeeper1/10.0= .0.241:2181, initiating session
> 2014-07-10 11:00:14 o.a.z.ClientCnxn [INFO] Session establishment comp= lete on server zookeeper1/10.0.0.241:2181, sessionid =3D 0x1471fbc929c00f4, negotiated timeou= t =3D 20000
> 2014-07-10 11:00:14 o.a.c.f.s.ConnectionStateManager [INFO] State chan= ge: CONNECTED
> 2014-07-10 11:00:14 o.a.c.f.s.ConnectionStateManager [WARN] There are = no ConnectionStateListeners registered.
> 2014-07-10 11:00:15 o.a.z.ZooKeeper [INFO] Session: 0x1471fbc929c00f4 = closed
> 2014-07-10 11:00:15 o.a.c.f.i.CuratorFrameworkImpl [INFO] Starting
> 2014-07-10 11:00:15 o.a.z.ClientCnxn [INFO] EventThread shut down
> 2014-07-10 11:00:15 o.a.z.ZooKeeper [INFO] Initiating client connectio= n, connectString=3Dzookeeper1:2181/transactional/tridentTestOpaqueSpout/use= r sessionTimeout=3D20000 watcher=3Dorg.apache.curator.ConnectionState@4fa > 983e
> 2014-07-10 11:00:15 o.a.z.ClientCnxn [INFO] Opening socket connection = to server zookeeper1/1= 0.0.0.241:2181. Will not attempt to authenticate using SASL (unknown er= ror)
> 2014-07-10 11:00:15 o.a.z.ClientCnxn [INFO] Socket connection establis= hed to zookeeper1/10.0= .0.241:2181, initiating session
> 2014-07-10 11:00:15 b.s.d.executor [INFO] Prepared bolt spout0:(5)
> 2014-07-10 11:00:15 o.a.z.ClientCnxn [INFO] Session establishment comp= lete on server zookeeper1/10.0.0.241:2181, sessionid =3D 0x1471fbc929c00f5, negotiated timeou= t =3D 20000
> 2014-07-10 11:00:15 o.a.c.f.s.ConnectionStateManager [INFO] State chan= ge: CONNECTED
> 2014-07-10 11:00:15 o.a.c.f.s.ConnectionStateManager [WARN] There are = no ConnectionStateListeners registered.
> 2014-07-10 11:00:16 b.s.util [ERROR] Async loop died!
> java.lang.RuntimeException: java.net.ConnectException: Connection refu= sed
> =C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0 at backtype.storm.utils.Dis= ruptorQueue.consumeBatchToCursor(DisruptorQueue.java:128) ~[storm-core-0.9.= 2-incubating.jar:0.9.2-incubating]
> =C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0 at backtype.storm.utils.Dis= ruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:99) ~[storm-core-= 0.9.2-incubating.jar:0.9.2-incubating]
> =C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0 at backtype.storm.disruptor= $consume_batch_when_available.invoke(disruptor.clj:80) ~[storm-core-0.9.2-i= ncubating.jar:0.9.2-incubating]
> =C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0 at backtype.storm.daemon.ex= ecutor$fn__5641$fn__5653$fn__5700.invoke(executor.clj:746) ~[storm-core-0.9= .2-incubating.jar:0.9.2-incubating]
> =C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0 at backtype.storm.util$asyn= c_loop$fn__457.invoke(util.clj:431) ~[storm-core-0.9.2-incubating.jar:0.9.2= -incubating]
> =C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0 at clojure.lang.AFn.run(AFn= .java:24) ~[clojure-1.5.1.jar:na]
> =C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0 at java.lang.Thread.run(Thr= ead.java:701) ~[na:1.6.0_30]
> Caused by: java.net.ConnectException: Connection refused
> =C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0 at sun.nio.ch.Net.connect(N= ative Method) ~[na:1.6.0_30]
> =C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0 at sun.nio.ch.SocketChannel= Impl.connect(SocketChannelImpl.java:534) ~[na:1.6.0_30]
> =C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0 at kafka.network.BlockingCh= annel.connect(BlockingChannel.scala:57) ~[stormjar.jar:na]
> =C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0 at kafka.consumer.SimpleCon= sumer.connect(SimpleConsumer.scala:44) ~[stormjar.jar:na]
> =C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0 at kafka.consumer.SimpleCon= sumer.getOrMakeConnection(SimpleConsumer.scala:142) ~[stormjar.jar:na]
> =C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0 at kafka.consumer.SimpleCon= sumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:69) ~= [stormjar.jar:na]
> =C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0 at kafka.consumer.SimpleCon= sumer.getOffsetsBefore(SimpleConsumer.scala:124) ~[stormjar.jar:na]
> =C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0 at kafka.javaapi.consumer.S= impleConsumer.getOffsetsBefore(SimpleConsumer.scala:79) ~[stormjar.jar:na]<= br> > =C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0 at storm.kafka.KafkaUtils.g= etOffset(KafkaUtils.java:77) ~[stormjar.jar:na]
> =C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0 at storm.kafka.KafkaUtils.g= etOffset(KafkaUtils.java:67) ~[stormjar.jar:na]
> =C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0 at storm.kafka.trident.Trid= entKafkaEmitter.doEmitNewPartitionBatch(TridentKafkaEmitter.java:111) ~[sto= rmjar.jar:na]
> =C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0 at storm.kafka.trident.Trid= entKafkaEmitter.failFastEmitNewPartitionBatch(TridentKafkaEmitter.java:72) = ~[stormjar.jar:na]
> =C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0 at storm.kafka.trident.Trid= entKafkaEmitter.emitNewPartitionBatch(TridentKafkaEmitter.java:79) ~[stormj= ar.jar:na]
> =C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0 at storm.kafka.trident.Trid= entKafkaEmitter.access$000(TridentKafkaEmitter.java:46) ~[stormjar.jar:na]<= br> > =C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0 at storm.kafka.trident.Trid= entKafkaEmitter$1.emitPartitionBatch(TridentKafkaEmitter.java:204) ~[stormj= ar.jar:na]
> =C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0 at storm.kafka.trident.Trid= entKafkaEmitter$1.emitPartitionBatch(TridentKafkaEmitter.java:194) ~[stormj= ar.jar:na]
> =C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0 at storm.trident.spout.Opaq= uePartitionedTridentSpoutExecutor$Emitter.emitBatch(OpaquePartitionedTriden= tSpoutExecutor.java:127) ~[storm-core-0.9.2-incubating.jar:0.9.2-incubating= ]
> =C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0 at storm.trident.spout.Trid= entSpoutExecutor.execute(TridentSpoutExecutor.java:82) ~[storm-core-0.9.2-i= ncubating.jar:0.9.2-incubating]
> =C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0 at storm.trident.topology.T= ridentBoltExecutor.execute(TridentBoltExecutor.java:369) ~[storm-core-0.9.2= -incubating.jar:0.9.2-incubating]
> =C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0 at backtype.storm.daemon.ex= ecutor$fn__5641$tuple_action_fn__5643.invoke(executor.clj:631) ~[storm-core= -0.9.2-incubating.jar:0.9.2-incubating]
> =C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0 at backtype.storm.daemon.ex= ecutor$mk_task_receiver$fn__5564.invoke(executor.clj:399) ~[storm-core-0.9.= 2-incubating.jar:0.9.2-incubating]
> =C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0 at backtype.storm.disruptor= $clojure_handler$reify__745.onEvent(disruptor.clj:58) ~[storm-core-0.9.2-in= cubating.jar:0.9.2-incubating]
> =C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0 at backtype.storm.utils.Dis= ruptorQueue.consumeBatchToCursor(DisruptorQueue.java:120) ~[storm-core-0.9.= 2-incubating.jar:0.9.2-incubating]
> =C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0 ... 6 common frames omitted=
> 2014-07-10 11:00:16 b.s.d.executor [ERROR]
>
> I've specified in simple test topology that I want only one worker= , so what happens after this error, supervisor will boot up another woorker= , which fails with same error after ~ minute of running.
>
> What is this about and how to fix this problem? (setup is the same, de= fault wirbelsturm cluster running on ubuntu 64b machine)
>
> btw, here is new test topology, using opaque kafka spout (same thing h= appens with transactional too)
>
>
> public class TridentKafkaOpaqueDeployer {
>
>
> =C2=A0=C2=A0=C2=A0 public static class PrinterBolt extends BaseFunctio= n {
> =C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 private static final long serial= VersionUID =3D -5585127152942983256L;
>
> =C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 @Override
> =C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0 public void execute(Trident= Tuple tuple, TridentCollector tridentCollector) {
> =C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0 Sys= tem.out.println(tuple.toString());
> =C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0 }
> =C2=A0=C2=A0=C2=A0 }
> =C2=A0=C2=A0=C2=A0
> =C2=A0=C2=A0=C2=A0 public static void main(String[] args) throws Excep= tion {
>
> =C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0 BrokerHosts brokerHosts =3D= new ZkHosts("zookeeper1");
> =C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0 TridentKafkaConfig kafkaCon= fig =3D new TridentKafkaConfig(brokerHosts, "tridentOpaqueTest");=
> =C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0
> =C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0 kafkaConfig.scheme =3D new = SchemeAsMultiScheme(new StringScheme());
> //=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0 kafkaConfig.forceFromStar= t =3D true;
>
>
> =C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0 TridentTopology topology = =3D new TridentTopology();
> =C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0
> =C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0 topology
> =C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 .newStream(&q= uot;tridentTestOpaqueSpout", new OpaqueTridentKafkaSpout(kafkaConfig))=
> =C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0= =C2=A0 =C2=A0=C2=A0=C2=A0 .name("tridentTestOpaqueSpout")
>
> =C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0 .each(ne= w Fields("str"), new PrinterBolt(),
> =C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0 =C2=A0= =C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 new Fields("print"));
> =C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0
> =C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0 Config config =3D new Confi= g();
> =C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0 config.put(Config.TOPOLOGY_= TRIDENT_BATCH_EMIT_INTERVAL_MILLIS, 2000);
> =C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0 config.setNumWorkers(1); > =C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0 config.setMaxSpoutPending(3= );
>
> =C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 StormSubmitter.submitTopologyWit= hProgressBar(args[0], config, topology.build());
> =C2=A0=C2=A0=C2=A0 }
> }
>
>
> // note
>
> I also resolved issue with reusing spout name across topologies, as pe= r recommendation that Nathan gave some time ago
> https://groups.google.com/forum/#!topic/storm-user/T= n43K1eGcKY
>
>
>
>
> On Wed, Jul 9, 2014 at 9:49 PM, Danijel Schiavuzzi <danijel@schiavuzzi.com>= wrote:
>>
>> Very strange.=C2=A0Could you try deleting=C2=A0Trident's data = in=C2=A0Zookeeper:
>>
>> $=C2=A0sh zkCli.sh
>> rmr /transactional
>>
>> and=C2=A0then resubmitting the topology and repeating your test sc= enario?
>>
>> Maybe the the spout's data in Zookeeper got somehow corrupted = because you are setting forceFromStart in the spout, and resubmitting the t= opology multiple times. I think the transactional=C2=A0topology may be left= in=C2=A0an undefined state that=C2=A0case.
>>
>> You could also enable the LoggingMetricsConsumer in storm.yaml, an= d then check the Kafka spout's=C2=A0kafka.latestOffset metric in metric= s.log, and compare this offset with the one Kafka's own utility script= =C2=A0outputs (search=C2=A0under=C2=A0kafka/bin/ for the script).
>>
>> On Wednesday, July 9, 2014, Milo=C5=A1 Soluji=C4=87 <milos.solujic@gmail.com<= /a>> wrote:
>>>
>>> Yep, I did double checked.
>>>
>>> Here is how it's done:
>>>
>>> #create topic
>>> /opt/kafka/bin/kafka-topics.sh --create --zookeeper zookeeper1= :2181 --replication-factor 1 --partition 1 --topic scores
>>>
>>> #check what is created
>>> /opt/kafka/bin/kafka-topics.sh --zookeeper zookeeper1:2181 --d= escribe --topic scores
>>>
>>> #produce few messages
>>> /opt/kafka/bin/kafka-console-producer.sh --broker-list localho= st:9092 --topic scores
>>>
>>> #consumer
>>> /opt/kafka/bin/kafka-console-consumer.sh --zookeeper zookeeper= 1:2181 --topic scores --from-beginning
>>>
>>>
>>>
>>>
>>> > try=C2=A0enabling=C2=A0Config.setDebug(true) and monitori= ng=C2=A0the Kafka spout's activity in the logs.
>>> did that, only tick touples are shipped around, nothing else >>>
>>> > Also, you should paste all your worker logs (worker-*.log= files).
>>> Forgot to mention, only one worker is set, exactly for reason = to simplify things.
>>>
>>>
>>>
>>> Here is simplified version of this topology (no trident state,= only simple printer bolt)
>>>
>>>
>>> public class TridentKafkaDeployer {
>>>
>>> =C2=A0=C2=A0=C2=A0 public static class PrinterBolt extends Bas= eFunction {
>>> =C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 private static final lon= g serialVersionUID =3D -5585127152942983256L;
>>>
>>> =C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 @Override
>>> =C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0 public void execute= (TridentTuple tuple, TridentCollector tridentCollector) {
>>> =C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0= =C2=A0 System.out.println(tuple.toString());
>>> =C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0 }
>>> =C2=A0=C2=A0=C2=A0 }
>>> =C2=A0=C2=A0=C2=A0
>>> =C2=A0=C2=A0=C2=A0 public static void main(String[] args) thro= ws Exception {
>>>
>>> =C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0 BrokerHosts zk =3D = new ZkHosts("zookeeper1");
>>> =C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0 TridentKafkaConfig = kafkaConfig =3D new TridentKafkaConfig(zk, "scores");
>>> =C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0
>>> =C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0 kafkaConfig.scheme = =3D new SchemeAsMultiScheme(new StringScheme());
>>> =C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0 kafkaConfig.forceFr= omStart =3D true;
>>>
>>> =C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0 TridentTopology top= ology =3D new TridentTopology();
>>> =C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0
>>> =C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0 topology
>>> =C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 .newS= tream("raw-scores", new TransactionalTridentKafkaSpout(kafkaConfi= g))
>>> =C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 =C2= =A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 .name("kafkaSpout")
>>> =C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0 = .each(new Fields("str"), new PrinterBolt(),
>>> =C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0 = =C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 new Fields("print"));
>>> =C2=A0=C2=A0=C2=A0=C2=A0=C2=A0
>>> =C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0
>>> =C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0 Config config =3D n= ew Config();
>>> =C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0 config.put(Config.T= OPOLOGY_TRIDENT_BATCH_EMIT_INTERVAL_MILLIS, 2000);
>>> =C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0 config.setNumWorker= s(1);
>>> =C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0 config.setMaxSpoutP= ending(3);
>>>
>>> =C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 StormSubmitter.submitTop= ologyWithProgressBar(args[0], config, topology.build());
>>> =C2=A0=C2=A0=C2=A0 }
>>> }
>>>
>>> Exactly same behaviour (it goes to exactly same kafka topic) = =3D no picking up fresh messages in kafka topic.
>>>
>>>
>>>
>>>
>>>
>>> On Tue, Jul 8, 2014 at 7:08 PM, Danijel Schiavuzzi <
danijel@schiavuzzi.com= > wrote:
>>>>
>>>> Also, you should paste all your worker logs (worker-*.log = files).
>>>>
>>>>
>>>> On Tuesday, July 8, 2014, Danijel Schiavuzzi <danijel@schiavuzzi.com= > wrote:
>>>>>
>>>>> I'd double check the Kafka producer to make sure t= hose=C2=A0messages are really getting into the right=C2=A0Kafka topic. Also= , try=C2=A0enabling=C2=A0Config.setDebug(true) and monitoring=C2=A0the Kafk= a spout's activity in the logs. setMaxSpoutPending should always be set= , as by default it is unset, so you risk internal=C2=A0queue explosion.
>>>>>
>>>>> On Tuesday, July 8, 2014, Milo=C5=A1 Soluji=C4=87 <= milos.solujic@= gmail.com> wrote:
>>>>>>
>>>>>> Yep. pretty much sure. Via internal kafka-producer= .sh
>>>>>> same method is used to produce initial messages (b= efore first launch of topology, that got consumed and processed just fine)<= br> >>>>>>
>>>>>> as for maxSpoutPending first I tried with 10, than= removed it (left default value)
>>>>>>
>>>>>>
>>>>>> On Tue, Jul 8, 2014 at 6:31 PM, Danijel Schiavuzzi= <danijel@sc= hiavuzzi.com> wrote:
>>>>>>>
>>>>>>> Are you sure you are producing new messages in= to the same=C2=A0Kafka topic?=C2=A0What number did you set maxSpoutPending = to?
>>>>>>>
>>>>>>> On Tuesday, July 8, 2014, Milo=C5=A1 Soluji=C4= =87 <milos.= solujic@gmail.com> wrote:
>>>>>>>>
>>>>>>>> Thanks Danijel for your quick proposition.=
>>>>>>>>
>>>>>>>> I tried lowering down and removing all per= formance settings (those were left from load testing on one machine)
>>>>>>>>
>>>>>>>> Still same result: no matter what, new mes= sages are not taken from kafka after topology is redeployed.
>>>>>>>>
>>>>>>>>
>>>>>>>> On Tue, Jul 8, 2014 at 6:15 PM, Danijel Sc= hiavuzzi <da= nijel@schiavuzzi.com> wrote:
>>>>>>>>>
>>>>>>>>> Try lowering setMaxSpoutPending(100000= ) to a much lower value (like 10). In Trident, setMaxSpoutPending referns t= o the number of batches, not tuples like in plain Storm. Too high values ma= y cause blockages like the one you describe.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Tuesday, July 8, 2014, Milo=C5=A1 S= oluji=C4=87 <milos.solujic@gmail.com> wrote:
>>>>>>>>>>
>>>>>>>>>> Hi all,
>>>>>>>>>>
>>>>>>>>>> I'm pretty new to storm and ka= fka/zookeeper, and I hope that my question is not to dumb. Here it goes: >>>>>>>>>>
>>>>>>>>>> I'm using latest stable storm = and storm-kafka =3D 0.9.2-incubating
>>>>>>>>>>
>>>>>>>>>> I've setup test cluster using = wirbelsturm tool with unchanged yaml (just uncommented kafka machine)
>>>>>>>>>>
>>>>>>>>>> here is config snippet for my trid= ent topology:
>>>>>>>>>>
>>>>>>>>>> =C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2= =A0=C2=A0 BrokerHosts zk =3D new ZkHosts("zookeeper1");
>>>>>>>>>> =C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2= =A0=C2=A0 TridentKafkaConfig kafkaConf =3D new TridentKafkaConfig(zk, "= ;scores");=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0
>>>>>>>>>> =C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2= =A0=C2=A0
>>>>>>>>>> =C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2= =A0=C2=A0 kafkaConf.scheme =3D new SchemeAsMultiScheme(new StringScheme());=
>>>>>>>>>> =C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2= =A0=C2=A0 kafkaConf.fetchSizeBytes =3D 10000;
>>>>>>>>>> =C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2= =A0 kafkaConf.forceFromStart =3D true;
>>>>>>>>>>
>>>>>>>>>> =C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2= =A0=C2=A0 Config stormConfig =3D new Config();
>>>>>>>>>> =C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2= =A0 stormConfig.put(Config.NIMBUS_HOST, NIMBUS_IP);
>>>>>>>>>> =C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2= =A0 stormConfig.put("couchbase.ip", COUCHBASE_CONSOLE);
>>>>>>>>>> =C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2= =A0 stormConfig.put("couchbase.bucket", COUCHBASE_BUCKET);
>>>>>>>>>> =C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2= =A0 stormConfig.put("couchbase.password", COUCHBASE_PASSWORD); >>>>>>>>>> =C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2= =A0 // performance settings
>>>>>>>>>> =C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2= =A0 stormConfig.put(Config.TOPOLOGY_TRIDENT_BATCH_EMIT_INTERVAL_MILLIS, 100= );
>>>>>>>>>> =C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2= =A0 stormConfig.put(RichSpoutBatchExecutor.MAX_BATCH_SIZE_CONF, 100);
>>>>>>>>>> =C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2= =A0 stormConfig.setMaxSpoutPending(100000);
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> =C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2= =A0 if (args !=3D null && args.length > 0) {
>>>>>>>>>>
>>>>>>>>>> =C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2= =A0 =C2=A0=C2=A0=C2=A0 StormSubmitter.submitTopologyWithProgressBar(args[0]= , stormConfig,
>>>>>>>>>> =C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2= =A0 =C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 BuildTridentSc= oreTopology.build(kafkaConf));
>>>>>>>>>> =C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2= =A0 } else {...}
>>>>>>>>>>
>>>>>>>>>> Now, I've created 'scores&= #39; topic in kafka and pushed few test messages prior to starting topology= , with kafkaConf.forceFromStart =3D true. And topology processed those mess= ages just fine, and stored them in tridentState (couhbase)
>>>>>>>>>>
>>>>>>>>>> All new messages are simply ignore= d!
>>>>>>>>>>
>>>>>>>>>> After redeploying topology (both w= ith forceFromStart =3D true and forceFromStart =3D false) no more messages = are ingested from kafka.
>>>>>>>>>>
>>>>>>>>>> here is worker log for one topolog= y deployment and short run http://pastie.org/private/4xsk6pijvmulwrcg7zg= ca
>>>>>>>>>>
>>>>>>>>>> those are VMs that host this storm= cluster
>>>>>>>>>> 10.0.0.241 zookeeper1
>>>>>>>>>> 10.0.0.101 supervisor1
>>>>>>>>>> 10.0.0.21 kafka1
>>>>>>>>>> 10.0.0.251 nimbus1
>>>>>>>>>>
>>>>>>>>>> Thanks,
>>>>>>>>>> Milos
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> =C2=A0
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> --
>>>>>>>>> Danijel Schiavuzzi
>>>>>>>>>
>>>>>>>>> E: danijel@schiavuzzi.com
>>>>>>>>> W: www.schiavuzzi.com
>>>>>>>>> T: +385989035562
>>>>>>>>> Skype: danijels7
>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> --
>>>>>>> Danijel Schiavuzzi
>>>>>>>
>>>>>>> E: danijel@schiavuzzi.com
>>>>>>> W: www.schiavuzzi.com
>>>>>>> T: +385989035562
>>>>>>> Skype: danijels7
>>>>>>
>>>>>>
>>>>>
>>>>>
>>>>> --
>>>>> Danijel Schiavuzzi
>>>>>
>>>>> E: danijel@schiavuzzi.com
>>>>> W: www.schiavuzzi.com
>>>>> T: +385989035562
>>>>> Skype: danijels7
>>>>
>>>>
>>>>
>>>> --
>>>> Danijel Schiavuzzi
>>>>
>>>> E: danijel@schiavuzzi.com
>>>> W: www.schiavuzzi.com
>>>> T: +385989035562
>>>> Skype: danijels7
>>>
>>>
>>
>>
>> --
>> Danijel Schiavuzzi
>>
>> E: dan= ijel@schiavuzzi.com
>> W: www.sch= iavuzzi.com
>> T: +385989035562
>> Skype: danijels7
>
>

--f46d0438eb9b7cef2704fdd5ad7a--