Yes On 10 Jul 2014 14:03, "Danijel Schiavuzzi" wrote: > Did you kill your topology before clearing the Zookeeper data? > > On Jul 10, 2014 1:24 PM, "Miloš Solujić" wrote: > > > > Thanks Danijel for taking interest in my problem. > > > > Exactly same feeling I've got (that zookeeper data is corrupted) So I > purged info about it via zkCli.sh > > > > Now I've got some lower level issues: > > > > 2014-07-10 11:00:13 b.s.d.worker [INFO] Worker > 04a17a6b-5aea-47ce-808b-218c4bcc1d51 for storm > tridentOpaqueTest-topology-3-1404989752 on > e4cdd619-c7e4-40f0-941e-352ac41daf6e:6701 has finished loading > > 2014-07-10 11:00:14 s.k.DynamicBrokersReader [INFO] Read partition info > from zookeeper: GlobalPartitionInformation{partitionMap={0=localhost:9092}} > > 2014-07-10 11:00:14 o.a.c.f.i.CuratorFrameworkImpl [INFO] Starting > > 2014-07-10 11:00:14 o.a.z.ZooKeeper [INFO] Initiating client connection, > connectString=zookeeper1:2181 sessionTimeout=20000 > watcher=org.apache.curator.ConnectionState@203c6f3e > > 2014-07-10 11:00:14 o.a.z.ClientCnxn [INFO] Opening socket connection to > server zookeeper1/10.0.0.241:2181. Will not attempt to authenticate using > SASL (unknown error) > > 2014-07-10 11:00:14 o.a.z.ClientCnxn [INFO] Socket connection > established to zookeeper1/10.0.0.241:2181, initiating session > > 2014-07-10 11:00:14 o.a.z.ClientCnxn [INFO] Session establishment > complete on server zookeeper1/10.0.0.241:2181, sessionid = > 0x1471fbc929c00f4, negotiated timeout = 20000 > > 2014-07-10 11:00:14 o.a.c.f.s.ConnectionStateManager [INFO] State > change: CONNECTED > > 2014-07-10 11:00:14 o.a.c.f.s.ConnectionStateManager [WARN] There are no > ConnectionStateListeners registered. > > 2014-07-10 11:00:15 o.a.z.ZooKeeper [INFO] Session: 0x1471fbc929c00f4 > closed > > 2014-07-10 11:00:15 o.a.c.f.i.CuratorFrameworkImpl [INFO] Starting > > 2014-07-10 11:00:15 o.a.z.ClientCnxn [INFO] EventThread shut down > > 2014-07-10 11:00:15 o.a.z.ZooKeeper [INFO] Initiating client connection, > connectString=zookeeper1:2181/transactional/tridentTestOpaqueSpout/user > sessionTimeout=20000 watcher=org.apache.curator.ConnectionState@4fa > > 983e > > 2014-07-10 11:00:15 o.a.z.ClientCnxn [INFO] Opening socket connection to > server zookeeper1/10.0.0.241:2181. Will not attempt to authenticate using > SASL (unknown error) > > 2014-07-10 11:00:15 o.a.z.ClientCnxn [INFO] Socket connection > established to zookeeper1/10.0.0.241:2181, initiating session > > 2014-07-10 11:00:15 b.s.d.executor [INFO] Prepared bolt spout0:(5) > > 2014-07-10 11:00:15 o.a.z.ClientCnxn [INFO] Session establishment > complete on server zookeeper1/10.0.0.241:2181, sessionid = > 0x1471fbc929c00f5, negotiated timeout = 20000 > > 2014-07-10 11:00:15 o.a.c.f.s.ConnectionStateManager [INFO] State > change: CONNECTED > > 2014-07-10 11:00:15 o.a.c.f.s.ConnectionStateManager [WARN] There are no > ConnectionStateListeners registered. > > 2014-07-10 11:00:16 b.s.util [ERROR] Async loop died! > > java.lang.RuntimeException: java.net.ConnectException: Connection refused > > at > backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:128) > ~[storm-core-0.9.2-incubating.jar:0.9.2-incubating] > > at > backtype.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:99) > ~[storm-core-0.9.2-incubating.jar:0.9.2-incubating] > > at > backtype.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:80) > ~[storm-core-0.9.2-incubating.jar:0.9.2-incubating] > > at > backtype.storm.daemon.executor$fn__5641$fn__5653$fn__5700.invoke(executor.clj:746) > ~[storm-core-0.9.2-incubating.jar:0.9.2-incubating] > > at backtype.storm.util$async_loop$fn__457.invoke(util.clj:431) > ~[storm-core-0.9.2-incubating.jar:0.9.2-incubating] > > at clojure.lang.AFn.run(AFn.java:24) ~[clojure-1.5.1.jar:na] > > at java.lang.Thread.run(Thread.java:701) ~[na:1.6.0_30] > > Caused by: java.net.ConnectException: Connection refused > > at sun.nio.ch.Net.connect(Native Method) ~[na:1.6.0_30] > > at > sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:534) > ~[na:1.6.0_30] > > at > kafka.network.BlockingChannel.connect(BlockingChannel.scala:57) > ~[stormjar.jar:na] > > at > kafka.consumer.SimpleConsumer.connect(SimpleConsumer.scala:44) > ~[stormjar.jar:na] > > at > kafka.consumer.SimpleConsumer.getOrMakeConnection(SimpleConsumer.scala:142) > ~[stormjar.jar:na] > > at > kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:69) > ~[stormjar.jar:na] > > at > kafka.consumer.SimpleConsumer.getOffsetsBefore(SimpleConsumer.scala:124) > ~[stormjar.jar:na] > > at > kafka.javaapi.consumer.SimpleConsumer.getOffsetsBefore(SimpleConsumer.scala:79) > ~[stormjar.jar:na] > > at storm.kafka.KafkaUtils.getOffset(KafkaUtils.java:77) > ~[stormjar.jar:na] > > at storm.kafka.KafkaUtils.getOffset(KafkaUtils.java:67) > ~[stormjar.jar:na] > > at > storm.kafka.trident.TridentKafkaEmitter.doEmitNewPartitionBatch(TridentKafkaEmitter.java:111) > ~[stormjar.jar:na] > > at > storm.kafka.trident.TridentKafkaEmitter.failFastEmitNewPartitionBatch(TridentKafkaEmitter.java:72) > ~[stormjar.jar:na] > > at > storm.kafka.trident.TridentKafkaEmitter.emitNewPartitionBatch(TridentKafkaEmitter.java:79) > ~[stormjar.jar:na] > > at > storm.kafka.trident.TridentKafkaEmitter.access$000(TridentKafkaEmitter.java:46) > ~[stormjar.jar:na] > > at > storm.kafka.trident.TridentKafkaEmitter$1.emitPartitionBatch(TridentKafkaEmitter.java:204) > ~[stormjar.jar:na] > > at > storm.kafka.trident.TridentKafkaEmitter$1.emitPartitionBatch(TridentKafkaEmitter.java:194) > ~[stormjar.jar:na] > > at > storm.trident.spout.OpaquePartitionedTridentSpoutExecutor$Emitter.emitBatch(OpaquePartitionedTridentSpoutExecutor.java:127) > ~[storm-core-0.9.2-incubating.jar:0.9.2-incubating] > > at > storm.trident.spout.TridentSpoutExecutor.execute(TridentSpoutExecutor.java:82) > ~[storm-core-0.9.2-incubating.jar:0.9.2-incubating] > > at > storm.trident.topology.TridentBoltExecutor.execute(TridentBoltExecutor.java:369) > ~[storm-core-0.9.2-incubating.jar:0.9.2-incubating] > > at > backtype.storm.daemon.executor$fn__5641$tuple_action_fn__5643.invoke(executor.clj:631) > ~[storm-core-0.9.2-incubating.jar:0.9.2-incubating] > > at > backtype.storm.daemon.executor$mk_task_receiver$fn__5564.invoke(executor.clj:399) > ~[storm-core-0.9.2-incubating.jar:0.9.2-incubating] > > at > backtype.storm.disruptor$clojure_handler$reify__745.onEvent(disruptor.clj:58) > ~[storm-core-0.9.2-incubating.jar:0.9.2-incubating] > > at > backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:120) > ~[storm-core-0.9.2-incubating.jar:0.9.2-incubating] > > ... 6 common frames omitted > > 2014-07-10 11:00:16 b.s.d.executor [ERROR] > > > > I've specified in simple test topology that I want only one worker, so > what happens after this error, supervisor will boot up another woorker, > which fails with same error after ~ minute of running. > > > > What is this about and how to fix this problem? (setup is the same, > default wirbelsturm cluster running on ubuntu 64b machine) > > > > btw, here is new test topology, using opaque kafka spout (same thing > happens with transactional too) > > > > > > public class TridentKafkaOpaqueDeployer { > > > > > > public static class PrinterBolt extends BaseFunction { > > private static final long serialVersionUID = > -5585127152942983256L; > > > > @Override > > public void execute(TridentTuple tuple, TridentCollector > tridentCollector) { > > System.out.println(tuple.toString()); > > } > > } > > > > public static void main(String[] args) throws Exception { > > > > BrokerHosts brokerHosts = new ZkHosts("zookeeper1"); > > TridentKafkaConfig kafkaConfig = new > TridentKafkaConfig(brokerHosts, "tridentOpaqueTest"); > > > > kafkaConfig.scheme = new SchemeAsMultiScheme(new StringScheme()); > > // kafkaConfig.forceFromStart = true; > > > > > > TridentTopology topology = new TridentTopology(); > > > > topology > > .newStream("tridentTestOpaqueSpout", new > OpaqueTridentKafkaSpout(kafkaConfig)) > > .name("tridentTestOpaqueSpout") > > > > .each(new Fields("str"), new PrinterBolt(), > > new Fields("print")); > > > > Config config = new Config(); > > config.put(Config.TOPOLOGY_TRIDENT_BATCH_EMIT_INTERVAL_MILLIS, > 2000); > > config.setNumWorkers(1); > > config.setMaxSpoutPending(3); > > > > StormSubmitter.submitTopologyWithProgressBar(args[0], config, > topology.build()); > > } > > } > > > > > > // note > > > > I also resolved issue with reusing spout name across topologies, as per > recommendation that Nathan gave some time ago > > https://groups.google.com/forum/#!topic/storm-user/Tn43K1eGcKY > > > > > > > > > > On Wed, Jul 9, 2014 at 9:49 PM, Danijel Schiavuzzi < > danijel@schiavuzzi.com> wrote: > >> > >> Very strange. Could you try deleting Trident's data in Zookeeper: > >> > >> $ sh zkCli.sh > >> rmr /transactional > >> > >> and then resubmitting the topology and repeating your test scenario? > >> > >> Maybe the the spout's data in Zookeeper got somehow corrupted because > you are setting forceFromStart in the spout, and resubmitting the topology > multiple times. I think the transactional topology may be left in an > undefined state that case. > >> > >> You could also enable the LoggingMetricsConsumer in storm.yaml, and > then check the Kafka spout's kafka.latestOffset metric in metrics.log, and > compare this offset with the one Kafka's own utility script outputs > (search under kafka/bin/ for the script). > >> > >> On Wednesday, July 9, 2014, Miloš Solujić > wrote: > >>> > >>> Yep, I did double checked. > >>> > >>> Here is how it's done: > >>> > >>> #create topic > >>> /opt/kafka/bin/kafka-topics.sh --create --zookeeper zookeeper1:2181 > --replication-factor 1 --partition 1 --topic scores > >>> > >>> #check what is created > >>> /opt/kafka/bin/kafka-topics.sh --zookeeper zookeeper1:2181 --describe > --topic scores > >>> > >>> #produce few messages > >>> /opt/kafka/bin/kafka-console-producer.sh --broker-list localhost:9092 > --topic scores > >>> > >>> #consumer > >>> /opt/kafka/bin/kafka-console-consumer.sh --zookeeper zookeeper1:2181 > --topic scores --from-beginning > >>> > >>> > >>> > >>> > >>> > try enabling Config.setDebug(true) and monitoring the Kafka spout's > activity in the logs. > >>> did that, only tick touples are shipped around, nothing else > >>> > >>> > Also, you should paste all your worker logs (worker-*.log files). > >>> Forgot to mention, only one worker is set, exactly for reason to > simplify things. > >>> > >>> > >>> > >>> Here is simplified version of this topology (no trident state, only > simple printer bolt) > >>> > >>> > >>> public class TridentKafkaDeployer { > >>> > >>> public static class PrinterBolt extends BaseFunction { > >>> private static final long serialVersionUID = > -5585127152942983256L; > >>> > >>> @Override > >>> public void execute(TridentTuple tuple, TridentCollector > tridentCollector) { > >>> System.out.println(tuple.toString()); > >>> } > >>> } > >>> > >>> public static void main(String[] args) throws Exception { > >>> > >>> BrokerHosts zk = new ZkHosts("zookeeper1"); > >>> TridentKafkaConfig kafkaConfig = new TridentKafkaConfig(zk, > "scores"); > >>> > >>> kafkaConfig.scheme = new SchemeAsMultiScheme(new > StringScheme()); > >>> kafkaConfig.forceFromStart = true; > >>> > >>> TridentTopology topology = new TridentTopology(); > >>> > >>> topology > >>> .newStream("raw-scores", new > TransactionalTridentKafkaSpout(kafkaConfig)) > >>> .name("kafkaSpout") > >>> .each(new Fields("str"), new PrinterBolt(), > >>> new Fields("print")); > >>> > >>> > >>> Config config = new Config(); > >>> config.put(Config.TOPOLOGY_TRIDENT_BATCH_EMIT_INTERVAL_MILLIS, > 2000); > >>> config.setNumWorkers(1); > >>> config.setMaxSpoutPending(3); > >>> > >>> StormSubmitter.submitTopologyWithProgressBar(args[0], config, > topology.build()); > >>> } > >>> } > >>> > >>> Exactly same behaviour (it goes to exactly same kafka topic) = no > picking up fresh messages in kafka topic. > >>> > >>> > >>> > >>> > >>> > >>> On Tue, Jul 8, 2014 at 7:08 PM, Danijel Schiavuzzi < > danijel@schiavuzzi.com> wrote: > >>>> > >>>> Also, you should paste all your worker logs (worker-*.log files). > >>>> > >>>> > >>>> On Tuesday, July 8, 2014, Danijel Schiavuzzi > wrote: > >>>>> > >>>>> I'd double check the Kafka producer to make sure those messages are > really getting into the right Kafka topic. Also, > try enabling Config.setDebug(true) and monitoring the Kafka spout's > activity in the logs. setMaxSpoutPending should always be set, as by > default it is unset, so you risk internal queue explosion. > >>>>> > >>>>> On Tuesday, July 8, 2014, Miloš Solujić > wrote: > >>>>>> > >>>>>> Yep. pretty much sure. Via internal kafka-producer.sh > >>>>>> same method is used to produce initial messages (before first > launch of topology, that got consumed and processed just fine) > >>>>>> > >>>>>> as for maxSpoutPending first I tried with 10, than removed it (left > default value) > >>>>>> > >>>>>> > >>>>>> On Tue, Jul 8, 2014 at 6:31 PM, Danijel Schiavuzzi < > danijel@schiavuzzi.com> wrote: > >>>>>>> > >>>>>>> Are you sure you are producing new messages into the same Kafka > topic? What number did you set maxSpoutPending to? > >>>>>>> > >>>>>>> On Tuesday, July 8, 2014, Miloš Solujić > wrote: > >>>>>>>> > >>>>>>>> Thanks Danijel for your quick proposition. > >>>>>>>> > >>>>>>>> I tried lowering down and removing all performance settings > (those were left from load testing on one machine) > >>>>>>>> > >>>>>>>> Still same result: no matter what, new messages are not taken > from kafka after topology is redeployed. > >>>>>>>> > >>>>>>>> > >>>>>>>> On Tue, Jul 8, 2014 at 6:15 PM, Danijel Schiavuzzi < > danijel@schiavuzzi.com> wrote: > >>>>>>>>> > >>>>>>>>> Try lowering setMaxSpoutPending(100000) to a much lower value > (like 10). In Trident, setMaxSpoutPending referns to the number of batches, > not tuples like in plain Storm. Too high values may cause blockages like > the one you describe. > >>>>>>>>> > >>>>>>>>> > >>>>>>>>> On Tuesday, July 8, 2014, Miloš Solujić > wrote: > >>>>>>>>>> > >>>>>>>>>> Hi all, > >>>>>>>>>> > >>>>>>>>>> I'm pretty new to storm and kafka/zookeeper, and I hope that my > question is not to dumb. Here it goes: > >>>>>>>>>> > >>>>>>>>>> I'm using latest stable storm and storm-kafka = > 0.9.2-incubating > >>>>>>>>>> > >>>>>>>>>> I've setup test cluster using wirbelsturm tool with unchanged > yaml (just uncommented kafka machine) > >>>>>>>>>> > >>>>>>>>>> here is config snippet for my trident topology: > >>>>>>>>>> > >>>>>>>>>> BrokerHosts zk = new ZkHosts("zookeeper1"); > >>>>>>>>>> TridentKafkaConfig kafkaConf = new > TridentKafkaConfig(zk, "scores"); > >>>>>>>>>> > >>>>>>>>>> kafkaConf.scheme = new SchemeAsMultiScheme(new > StringScheme()); > >>>>>>>>>> kafkaConf.fetchSizeBytes = 10000; > >>>>>>>>>> kafkaConf.forceFromStart = true; > >>>>>>>>>> > >>>>>>>>>> Config stormConfig = new Config(); > >>>>>>>>>> stormConfig.put(Config.NIMBUS_HOST, NIMBUS_IP); > >>>>>>>>>> stormConfig.put("couchbase.ip", COUCHBASE_CONSOLE); > >>>>>>>>>> stormConfig.put("couchbase.bucket", COUCHBASE_BUCKET); > >>>>>>>>>> stormConfig.put("couchbase.password", > COUCHBASE_PASSWORD); > >>>>>>>>>> // performance settings > >>>>>>>>>> > stormConfig.put(Config.TOPOLOGY_TRIDENT_BATCH_EMIT_INTERVAL_MILLIS, 100); > >>>>>>>>>> > stormConfig.put(RichSpoutBatchExecutor.MAX_BATCH_SIZE_CONF, 100); > >>>>>>>>>> stormConfig.setMaxSpoutPending(100000); > >>>>>>>>>> > >>>>>>>>>> > >>>>>>>>>> if (args != null && args.length > 0) { > >>>>>>>>>> > >>>>>>>>>> > StormSubmitter.submitTopologyWithProgressBar(args[0], stormConfig, > >>>>>>>>>> BuildTridentScoreTopology.build(kafkaConf)); > >>>>>>>>>> } else {...} > >>>>>>>>>> > >>>>>>>>>> Now, I've created 'scores' topic in kafka and pushed few test > messages prior to starting topology, with kafkaConf.forceFromStart = true. > And topology processed those messages just fine, and stored them in > tridentState (couhbase) > >>>>>>>>>> > >>>>>>>>>> All new messages are simply ignored! > >>>>>>>>>> > >>>>>>>>>> After redeploying topology (both with forceFromStart = true and > forceFromStart = false) no more messages are ingested from kafka. > >>>>>>>>>> > >>>>>>>>>> here is worker log for one topology deployment and short run > http://pastie.org/private/4xsk6pijvmulwrcg7zgca > >>>>>>>>>> > >>>>>>>>>> those are VMs that host this storm cluster > >>>>>>>>>> 10.0.0.241 zookeeper1 > >>>>>>>>>> 10.0.0.101 supervisor1 > >>>>>>>>>> 10.0.0.21 kafka1 > >>>>>>>>>> 10.0.0.251 nimbus1 > >>>>>>>>>> > >>>>>>>>>> Thanks, > >>>>>>>>>> Milos > >>>>>>>>>> > >>>>>>>>>> > >>>>>>>>>> > >>>>>>>>>> > >>>>>>>>>> > >>>>>>>>>> > >>>>>>>>>> > >>>>>>>>> > >>>>>>>>> > >>>>>>>>> -- > >>>>>>>>> Danijel Schiavuzzi > >>>>>>>>> > >>>>>>>>> E: danijel@schiavuzzi.com > >>>>>>>>> W: www.schiavuzzi.com > >>>>>>>>> T: +385989035562 > >>>>>>>>> Skype: danijels7 > >>>>>>>> > >>>>>>>> > >>>>>>> > >>>>>>> > >>>>>>> -- > >>>>>>> Danijel Schiavuzzi > >>>>>>> > >>>>>>> E: danijel@schiavuzzi.com > >>>>>>> W: www.schiavuzzi.com > >>>>>>> T: +385989035562 > >>>>>>> Skype: danijels7 > >>>>>> > >>>>>> > >>>>> > >>>>> > >>>>> -- > >>>>> Danijel Schiavuzzi > >>>>> > >>>>> E: danijel@schiavuzzi.com > >>>>> W: www.schiavuzzi.com > >>>>> T: +385989035562 > >>>>> Skype: danijels7 > >>>> > >>>> > >>>> > >>>> -- > >>>> Danijel Schiavuzzi > >>>> > >>>> E: danijel@schiavuzzi.com > >>>> W: www.schiavuzzi.com > >>>> T: +385989035562 > >>>> Skype: danijels7 > >>> > >>> > >> > >> > >> -- > >> Danijel Schiavuzzi > >> > >> E: danijel@schiavuzzi.com > >> W: www.schiavuzzi.com > >> T: +385989035562 > >> Skype: danijels7 > > > > >