storm-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Miloš Solujić <milos.solu...@gmail.com>
Subject Re: Kafka trident getting stuck
Date Thu, 10 Jul 2014 12:08:07 GMT
Yes
On 10 Jul 2014 14:03, "Danijel Schiavuzzi" <danijel@schiavuzzi.com> wrote:

> Did you kill your topology before clearing the Zookeeper data?
>
> On Jul 10, 2014 1:24 PM, "Miloš Solujić" <milos.solujic@gmail.com> wrote:
> >
> > Thanks Danijel for taking interest in my problem.
> >
> > Exactly same feeling I've got (that zookeeper data is corrupted) So I
> purged info about it via zkCli.sh
> >
> > Now I've got some lower level issues:
> >
> > 2014-07-10 11:00:13 b.s.d.worker [INFO] Worker
> 04a17a6b-5aea-47ce-808b-218c4bcc1d51 for storm
> tridentOpaqueTest-topology-3-1404989752 on
> e4cdd619-c7e4-40f0-941e-352ac41daf6e:6701 has finished loading
> > 2014-07-10 11:00:14 s.k.DynamicBrokersReader [INFO] Read partition info
> from zookeeper: GlobalPartitionInformation{partitionMap={0=localhost:9092}}
> > 2014-07-10 11:00:14 o.a.c.f.i.CuratorFrameworkImpl [INFO] Starting
> > 2014-07-10 11:00:14 o.a.z.ZooKeeper [INFO] Initiating client connection,
> connectString=zookeeper1:2181 sessionTimeout=20000
> watcher=org.apache.curator.ConnectionState@203c6f3e
> > 2014-07-10 11:00:14 o.a.z.ClientCnxn [INFO] Opening socket connection to
> server zookeeper1/10.0.0.241:2181. Will not attempt to authenticate using
> SASL (unknown error)
> > 2014-07-10 11:00:14 o.a.z.ClientCnxn [INFO] Socket connection
> established to zookeeper1/10.0.0.241:2181, initiating session
> > 2014-07-10 11:00:14 o.a.z.ClientCnxn [INFO] Session establishment
> complete on server zookeeper1/10.0.0.241:2181, sessionid =
> 0x1471fbc929c00f4, negotiated timeout = 20000
> > 2014-07-10 11:00:14 o.a.c.f.s.ConnectionStateManager [INFO] State
> change: CONNECTED
> > 2014-07-10 11:00:14 o.a.c.f.s.ConnectionStateManager [WARN] There are no
> ConnectionStateListeners registered.
> > 2014-07-10 11:00:15 o.a.z.ZooKeeper [INFO] Session: 0x1471fbc929c00f4
> closed
> > 2014-07-10 11:00:15 o.a.c.f.i.CuratorFrameworkImpl [INFO] Starting
> > 2014-07-10 11:00:15 o.a.z.ClientCnxn [INFO] EventThread shut down
> > 2014-07-10 11:00:15 o.a.z.ZooKeeper [INFO] Initiating client connection,
> connectString=zookeeper1:2181/transactional/tridentTestOpaqueSpout/user
> sessionTimeout=20000 watcher=org.apache.curator.ConnectionState@4fa
> > 983e
> > 2014-07-10 11:00:15 o.a.z.ClientCnxn [INFO] Opening socket connection to
> server zookeeper1/10.0.0.241:2181. Will not attempt to authenticate using
> SASL (unknown error)
> > 2014-07-10 11:00:15 o.a.z.ClientCnxn [INFO] Socket connection
> established to zookeeper1/10.0.0.241:2181, initiating session
> > 2014-07-10 11:00:15 b.s.d.executor [INFO] Prepared bolt spout0:(5)
> > 2014-07-10 11:00:15 o.a.z.ClientCnxn [INFO] Session establishment
> complete on server zookeeper1/10.0.0.241:2181, sessionid =
> 0x1471fbc929c00f5, negotiated timeout = 20000
> > 2014-07-10 11:00:15 o.a.c.f.s.ConnectionStateManager [INFO] State
> change: CONNECTED
> > 2014-07-10 11:00:15 o.a.c.f.s.ConnectionStateManager [WARN] There are no
> ConnectionStateListeners registered.
> > 2014-07-10 11:00:16 b.s.util [ERROR] Async loop died!
> > java.lang.RuntimeException: java.net.ConnectException: Connection refused
> >         at
> backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:128)
> ~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]
> >         at
> backtype.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:99)
> ~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]
> >         at
> backtype.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:80)
> ~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]
> >         at
> backtype.storm.daemon.executor$fn__5641$fn__5653$fn__5700.invoke(executor.clj:746)
> ~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]
> >         at backtype.storm.util$async_loop$fn__457.invoke(util.clj:431)
> ~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]
> >         at clojure.lang.AFn.run(AFn.java:24) ~[clojure-1.5.1.jar:na]
> >         at java.lang.Thread.run(Thread.java:701) ~[na:1.6.0_30]
> > Caused by: java.net.ConnectException: Connection refused
> >         at sun.nio.ch.Net.connect(Native Method) ~[na:1.6.0_30]
> >         at
> sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:534)
> ~[na:1.6.0_30]
> >         at
> kafka.network.BlockingChannel.connect(BlockingChannel.scala:57)
> ~[stormjar.jar:na]
> >         at
> kafka.consumer.SimpleConsumer.connect(SimpleConsumer.scala:44)
> ~[stormjar.jar:na]
> >         at
> kafka.consumer.SimpleConsumer.getOrMakeConnection(SimpleConsumer.scala:142)
> ~[stormjar.jar:na]
> >         at
> kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:69)
> ~[stormjar.jar:na]
> >         at
> kafka.consumer.SimpleConsumer.getOffsetsBefore(SimpleConsumer.scala:124)
> ~[stormjar.jar:na]
> >         at
> kafka.javaapi.consumer.SimpleConsumer.getOffsetsBefore(SimpleConsumer.scala:79)
> ~[stormjar.jar:na]
> >         at storm.kafka.KafkaUtils.getOffset(KafkaUtils.java:77)
> ~[stormjar.jar:na]
> >         at storm.kafka.KafkaUtils.getOffset(KafkaUtils.java:67)
> ~[stormjar.jar:na]
> >         at
> storm.kafka.trident.TridentKafkaEmitter.doEmitNewPartitionBatch(TridentKafkaEmitter.java:111)
> ~[stormjar.jar:na]
> >         at
> storm.kafka.trident.TridentKafkaEmitter.failFastEmitNewPartitionBatch(TridentKafkaEmitter.java:72)
> ~[stormjar.jar:na]
> >         at
> storm.kafka.trident.TridentKafkaEmitter.emitNewPartitionBatch(TridentKafkaEmitter.java:79)
> ~[stormjar.jar:na]
> >         at
> storm.kafka.trident.TridentKafkaEmitter.access$000(TridentKafkaEmitter.java:46)
> ~[stormjar.jar:na]
> >         at
> storm.kafka.trident.TridentKafkaEmitter$1.emitPartitionBatch(TridentKafkaEmitter.java:204)
> ~[stormjar.jar:na]
> >         at
> storm.kafka.trident.TridentKafkaEmitter$1.emitPartitionBatch(TridentKafkaEmitter.java:194)
> ~[stormjar.jar:na]
> >         at
> storm.trident.spout.OpaquePartitionedTridentSpoutExecutor$Emitter.emitBatch(OpaquePartitionedTridentSpoutExecutor.java:127)
> ~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]
> >         at
> storm.trident.spout.TridentSpoutExecutor.execute(TridentSpoutExecutor.java:82)
> ~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]
> >         at
> storm.trident.topology.TridentBoltExecutor.execute(TridentBoltExecutor.java:369)
> ~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]
> >         at
> backtype.storm.daemon.executor$fn__5641$tuple_action_fn__5643.invoke(executor.clj:631)
> ~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]
> >         at
> backtype.storm.daemon.executor$mk_task_receiver$fn__5564.invoke(executor.clj:399)
> ~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]
> >         at
> backtype.storm.disruptor$clojure_handler$reify__745.onEvent(disruptor.clj:58)
> ~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]
> >         at
> backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:120)
> ~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]
> >         ... 6 common frames omitted
> > 2014-07-10 11:00:16 b.s.d.executor [ERROR]
> >
> > I've specified in simple test topology that I want only one worker, so
> what happens after this error, supervisor will boot up another woorker,
> which fails with same error after ~ minute of running.
> >
> > What is this about and how to fix this problem? (setup is the same,
> default wirbelsturm cluster running on ubuntu 64b machine)
> >
> > btw, here is new test topology, using opaque kafka spout (same thing
> happens with transactional too)
> >
> >
> > public class TridentKafkaOpaqueDeployer {
> >
> >
> >     public static class PrinterBolt extends BaseFunction {
> >         private static final long serialVersionUID =
> -5585127152942983256L;
> >
> >         @Override
> >         public void execute(TridentTuple tuple, TridentCollector
> tridentCollector) {
> >             System.out.println(tuple.toString());
> >         }
> >     }
> >
> >     public static void main(String[] args) throws Exception {
> >
> >         BrokerHosts brokerHosts = new ZkHosts("zookeeper1");
> >         TridentKafkaConfig kafkaConfig = new
> TridentKafkaConfig(brokerHosts, "tridentOpaqueTest");
> >
> >         kafkaConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
> > //        kafkaConfig.forceFromStart = true;
> >
> >
> >         TridentTopology topology = new TridentTopology();
> >
> >         topology
> >             .newStream("tridentTestOpaqueSpout", new
> OpaqueTridentKafkaSpout(kafkaConfig))
> >                     .name("tridentTestOpaqueSpout")
> >
> >             .each(new Fields("str"), new PrinterBolt(),
> >                     new Fields("print"));
> >
> >         Config config = new Config();
> >         config.put(Config.TOPOLOGY_TRIDENT_BATCH_EMIT_INTERVAL_MILLIS,
> 2000);
> >         config.setNumWorkers(1);
> >         config.setMaxSpoutPending(3);
> >
> >         StormSubmitter.submitTopologyWithProgressBar(args[0], config,
> topology.build());
> >     }
> > }
> >
> >
> > // note
> >
> > I also resolved issue with reusing spout name across topologies, as per
> recommendation that Nathan gave some time ago
> > https://groups.google.com/forum/#!topic/storm-user/Tn43K1eGcKY
> >
> >
> >
> >
> > On Wed, Jul 9, 2014 at 9:49 PM, Danijel Schiavuzzi <
> danijel@schiavuzzi.com> wrote:
> >>
> >> Very strange. Could you try deleting Trident's data in Zookeeper:
> >>
> >> $ sh zkCli.sh
> >> rmr /transactional
> >>
> >> and then resubmitting the topology and repeating your test scenario?
> >>
> >> Maybe the the spout's data in Zookeeper got somehow corrupted because
> you are setting forceFromStart in the spout, and resubmitting the topology
> multiple times. I think the transactional topology may be left in an
> undefined state that case.
> >>
> >> You could also enable the LoggingMetricsConsumer in storm.yaml, and
> then check the Kafka spout's kafka.latestOffset metric in metrics.log, and
> compare this offset with the one Kafka's own utility script outputs
> (search under kafka/bin/ for the script).
> >>
> >> On Wednesday, July 9, 2014, Miloš Solujić <milos.solujic@gmail.com>
> wrote:
> >>>
> >>> Yep, I did double checked.
> >>>
> >>> Here is how it's done:
> >>>
> >>> #create topic
> >>> /opt/kafka/bin/kafka-topics.sh --create --zookeeper zookeeper1:2181
> --replication-factor 1 --partition 1 --topic scores
> >>>
> >>> #check what is created
> >>> /opt/kafka/bin/kafka-topics.sh --zookeeper zookeeper1:2181 --describe
> --topic scores
> >>>
> >>> #produce few messages
> >>> /opt/kafka/bin/kafka-console-producer.sh --broker-list localhost:9092
> --topic scores
> >>>
> >>> #consumer
> >>> /opt/kafka/bin/kafka-console-consumer.sh --zookeeper zookeeper1:2181
> --topic scores --from-beginning
> >>>
> >>>
> >>>
> >>>
> >>> > try enabling Config.setDebug(true) and monitoring the Kafka spout's
> activity in the logs.
> >>> did that, only tick touples are shipped around, nothing else
> >>>
> >>> > Also, you should paste all your worker logs (worker-*.log files).
> >>> Forgot to mention, only one worker is set, exactly for reason to
> simplify things.
> >>>
> >>>
> >>>
> >>> Here is simplified version of this topology (no trident state, only
> simple printer bolt)
> >>>
> >>>
> >>> public class TridentKafkaDeployer {
> >>>
> >>>     public static class PrinterBolt extends BaseFunction {
> >>>         private static final long serialVersionUID =
> -5585127152942983256L;
> >>>
> >>>         @Override
> >>>         public void execute(TridentTuple tuple, TridentCollector
> tridentCollector) {
> >>>             System.out.println(tuple.toString());
> >>>         }
> >>>     }
> >>>
> >>>     public static void main(String[] args) throws Exception {
> >>>
> >>>         BrokerHosts zk = new ZkHosts("zookeeper1");
> >>>         TridentKafkaConfig kafkaConfig = new TridentKafkaConfig(zk,
> "scores");
> >>>
> >>>         kafkaConfig.scheme = new SchemeAsMultiScheme(new
> StringScheme());
> >>>         kafkaConfig.forceFromStart = true;
> >>>
> >>>         TridentTopology topology = new TridentTopology();
> >>>
> >>>         topology
> >>>             .newStream("raw-scores", new
> TransactionalTridentKafkaSpout(kafkaConfig))
> >>>                     .name("kafkaSpout")
> >>>             .each(new Fields("str"), new PrinterBolt(),
> >>>                     new Fields("print"));
> >>>
> >>>
> >>>         Config config = new Config();
> >>>         config.put(Config.TOPOLOGY_TRIDENT_BATCH_EMIT_INTERVAL_MILLIS,
> 2000);
> >>>         config.setNumWorkers(1);
> >>>         config.setMaxSpoutPending(3);
> >>>
> >>>         StormSubmitter.submitTopologyWithProgressBar(args[0], config,
> topology.build());
> >>>     }
> >>> }
> >>>
> >>> Exactly same behaviour (it goes to exactly same kafka topic) = no
> picking up fresh messages in kafka topic.
> >>>
> >>>
> >>>
> >>>
> >>>
> >>> On Tue, Jul 8, 2014 at 7:08 PM, Danijel Schiavuzzi <
> danijel@schiavuzzi.com> wrote:
> >>>>
> >>>> Also, you should paste all your worker logs (worker-*.log files).
> >>>>
> >>>>
> >>>> On Tuesday, July 8, 2014, Danijel Schiavuzzi <danijel@schiavuzzi.com>
> wrote:
> >>>>>
> >>>>> I'd double check the Kafka producer to make sure those messages
are
> really getting into the right Kafka topic. Also,
> try enabling Config.setDebug(true) and monitoring the Kafka spout's
> activity in the logs. setMaxSpoutPending should always be set, as by
> default it is unset, so you risk internal queue explosion.
> >>>>>
> >>>>> On Tuesday, July 8, 2014, Miloš Solujić <milos.solujic@gmail.com>
> wrote:
> >>>>>>
> >>>>>> Yep. pretty much sure. Via internal kafka-producer.sh
> >>>>>> same method is used to produce initial messages (before first
> launch of topology, that got consumed and processed just fine)
> >>>>>>
> >>>>>> as for maxSpoutPending first I tried with 10, than removed it
(left
> default value)
> >>>>>>
> >>>>>>
> >>>>>> On Tue, Jul 8, 2014 at 6:31 PM, Danijel Schiavuzzi <
> danijel@schiavuzzi.com> wrote:
> >>>>>>>
> >>>>>>> Are you sure you are producing new messages into the same
Kafka
> topic? What number did you set maxSpoutPending to?
> >>>>>>>
> >>>>>>> On Tuesday, July 8, 2014, Miloš Solujić <milos.solujic@gmail.com>
> wrote:
> >>>>>>>>
> >>>>>>>> Thanks Danijel for your quick proposition.
> >>>>>>>>
> >>>>>>>> I tried lowering down and removing all performance settings
> (those were left from load testing on one machine)
> >>>>>>>>
> >>>>>>>> Still same result: no matter what, new messages are
not taken
> from kafka after topology is redeployed.
> >>>>>>>>
> >>>>>>>>
> >>>>>>>> On Tue, Jul 8, 2014 at 6:15 PM, Danijel Schiavuzzi <
> danijel@schiavuzzi.com> wrote:
> >>>>>>>>>
> >>>>>>>>> Try lowering setMaxSpoutPending(100000) to a much
lower value
> (like 10). In Trident, setMaxSpoutPending referns to the number of batches,
> not tuples like in plain Storm. Too high values may cause blockages like
> the one you describe.
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>> On Tuesday, July 8, 2014, Miloš Solujić <milos.solujic@gmail.com>
> wrote:
> >>>>>>>>>>
> >>>>>>>>>> Hi all,
> >>>>>>>>>>
> >>>>>>>>>> I'm pretty new to storm and kafka/zookeeper,
and I hope that my
> question is not to dumb. Here it goes:
> >>>>>>>>>>
> >>>>>>>>>> I'm using latest stable storm and storm-kafka
=
> 0.9.2-incubating
> >>>>>>>>>>
> >>>>>>>>>> I've setup test cluster using wirbelsturm tool
with unchanged
> yaml (just uncommented kafka machine)
> >>>>>>>>>>
> >>>>>>>>>> here is config snippet for my trident topology:
> >>>>>>>>>>
> >>>>>>>>>>         BrokerHosts zk = new ZkHosts("zookeeper1");
> >>>>>>>>>>         TridentKafkaConfig kafkaConf = new
> TridentKafkaConfig(zk, "scores");
> >>>>>>>>>>
> >>>>>>>>>>         kafkaConf.scheme = new SchemeAsMultiScheme(new
> StringScheme());
> >>>>>>>>>>         kafkaConf.fetchSizeBytes = 10000;
> >>>>>>>>>>         kafkaConf.forceFromStart = true;
> >>>>>>>>>>
> >>>>>>>>>>         Config stormConfig = new Config();
> >>>>>>>>>>         stormConfig.put(Config.NIMBUS_HOST,
NIMBUS_IP);
> >>>>>>>>>>         stormConfig.put("couchbase.ip", COUCHBASE_CONSOLE);
> >>>>>>>>>>         stormConfig.put("couchbase.bucket",
COUCHBASE_BUCKET);
> >>>>>>>>>>         stormConfig.put("couchbase.password",
> COUCHBASE_PASSWORD);
> >>>>>>>>>>         // performance settings
> >>>>>>>>>>
> stormConfig.put(Config.TOPOLOGY_TRIDENT_BATCH_EMIT_INTERVAL_MILLIS, 100);
> >>>>>>>>>>
> stormConfig.put(RichSpoutBatchExecutor.MAX_BATCH_SIZE_CONF, 100);
> >>>>>>>>>>         stormConfig.setMaxSpoutPending(100000);
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>>         if (args != null && args.length
> 0) {
> >>>>>>>>>>
> >>>>>>>>>>
> StormSubmitter.submitTopologyWithProgressBar(args[0], stormConfig,
> >>>>>>>>>>                     BuildTridentScoreTopology.build(kafkaConf));
> >>>>>>>>>>         } else {...}
> >>>>>>>>>>
> >>>>>>>>>> Now, I've created 'scores' topic in kafka and
pushed few test
> messages prior to starting topology, with kafkaConf.forceFromStart = true.
> And topology processed those messages just fine, and stored them in
> tridentState (couhbase)
> >>>>>>>>>>
> >>>>>>>>>> All new messages are simply ignored!
> >>>>>>>>>>
> >>>>>>>>>> After redeploying topology (both with forceFromStart
= true and
> forceFromStart = false) no more messages are ingested from kafka.
> >>>>>>>>>>
> >>>>>>>>>> here is worker log for one topology deployment
and short run
> http://pastie.org/private/4xsk6pijvmulwrcg7zgca
> >>>>>>>>>>
> >>>>>>>>>> those are VMs that host this storm cluster
> >>>>>>>>>> 10.0.0.241 zookeeper1
> >>>>>>>>>> 10.0.0.101 supervisor1
> >>>>>>>>>> 10.0.0.21 kafka1
> >>>>>>>>>> 10.0.0.251 nimbus1
> >>>>>>>>>>
> >>>>>>>>>> Thanks,
> >>>>>>>>>> Milos
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>> --
> >>>>>>>>> Danijel Schiavuzzi
> >>>>>>>>>
> >>>>>>>>> E: danijel@schiavuzzi.com
> >>>>>>>>> W: www.schiavuzzi.com
> >>>>>>>>> T: +385989035562
> >>>>>>>>> Skype: danijels7
> >>>>>>>>
> >>>>>>>>
> >>>>>>>
> >>>>>>>
> >>>>>>> --
> >>>>>>> Danijel Schiavuzzi
> >>>>>>>
> >>>>>>> E: danijel@schiavuzzi.com
> >>>>>>> W: www.schiavuzzi.com
> >>>>>>> T: +385989035562
> >>>>>>> Skype: danijels7
> >>>>>>
> >>>>>>
> >>>>>
> >>>>>
> >>>>> --
> >>>>> Danijel Schiavuzzi
> >>>>>
> >>>>> E: danijel@schiavuzzi.com
> >>>>> W: www.schiavuzzi.com
> >>>>> T: +385989035562
> >>>>> Skype: danijels7
> >>>>
> >>>>
> >>>>
> >>>> --
> >>>> Danijel Schiavuzzi
> >>>>
> >>>> E: danijel@schiavuzzi.com
> >>>> W: www.schiavuzzi.com
> >>>> T: +385989035562
> >>>> Skype: danijels7
> >>>
> >>>
> >>
> >>
> >> --
> >> Danijel Schiavuzzi
> >>
> >> E: danijel@schiavuzzi.com
> >> W: www.schiavuzzi.com
> >> T: +385989035562
> >> Skype: danijels7
> >
> >
>

Mime
View raw message