storm-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Miloš Solujić <milos.solu...@gmail.com>
Subject Re: Kafka trident getting stuck
Date Tue, 08 Jul 2014 16:27:48 GMT
Thanks Danijel for your quick proposition.

I tried lowering down and removing all performance settings (those were
left from load testing on one machine)

Still same result: no matter what, new messages are not taken from kafka
after topology is redeployed.


On Tue, Jul 8, 2014 at 6:15 PM, Danijel Schiavuzzi <danijel@schiavuzzi.com>
wrote:

> Try lowering setMaxSpoutPending(100000) to a much lower value (like 10).
> In Trident, setMaxSpoutPending referns to the number of batches, not tuples
> like in plain Storm. Too high values may cause blockages like the one you
> describe.
>
>
> On Tuesday, July 8, 2014, Miloš Solujić <milos.solujic@gmail.com> wrote:
>
>> Hi all,
>>
>> I'm pretty new to storm and kafka/zookeeper, and I hope that my question
>> is not to dumb. Here it goes:
>>
>> I'm using latest stable storm and storm-kafka = 0.9.2-incubating
>>
>> I've setup test cluster using wirbelsturm tool with unchanged yaml (just
>> uncommented kafka machine)
>>
>> here is config snippet for my trident topology:
>>
>>         BrokerHosts zk = new ZkHosts("zookeeper1");
>>         TridentKafkaConfig kafkaConf = new TridentKafkaConfig(zk,
>> "scores");
>>
>>         kafkaConf.scheme = new SchemeAsMultiScheme(new StringScheme());
>>         kafkaConf.fetchSizeBytes = 10000;
>>         kafkaConf.forceFromStart = true;
>>
>>         Config stormConfig = new Config();
>>         stormConfig.put(Config.NIMBUS_HOST, NIMBUS_IP);
>>         stormConfig.put("couchbase.ip", COUCHBASE_CONSOLE);
>>         stormConfig.put("couchbase.bucket", COUCHBASE_BUCKET);
>>         stormConfig.put("couchbase.password", COUCHBASE_PASSWORD);
>>         // performance settings
>>
>> stormConfig.put(Config.TOPOLOGY_TRIDENT_BATCH_EMIT_INTERVAL_MILLIS, 100);
>>         stormConfig.put(RichSpoutBatchExecutor.MAX_BATCH_SIZE_CONF, 100);
>>         stormConfig.setMaxSpoutPending(100000);
>>
>>
>>         if (args != null && args.length > 0) {
>>
>>             StormSubmitter.submitTopologyWithProgressBar(args[0],
>> stormConfig,
>>                     BuildTridentScoreTopology.build(kafkaConf));
>>         } else {...}
>>
>> Now, I've created 'scores' topic in kafka and pushed few test messages
>> prior to starting topology, with kafkaConf.forceFromStart = true. And
>> topology processed those messages just fine, and stored them in
>> tridentState (couhbase)
>>
>> All new messages are simply ignored!
>>
>> After redeploying topology (both with forceFromStart = true and
>> forceFromStart = false) no more messages are ingested from kafka.
>>
>> here is worker log for one topology deployment and short run
>> http://pastie.org/private/4xsk6pijvmulwrcg7zgca
>>
>> those are VMs that host this storm cluster
>> 10.0.0.241 zookeeper1
>> 10.0.0.101 supervisor1
>> 10.0.0.21 kafka1
>> 10.0.0.251 nimbus1
>>
>> Thanks,
>> Milos
>>
>>
>>
>>
>>
>>
>>
>>
>
> --
> Danijel Schiavuzzi
>
> E: danijel@schiavuzzi.com
> W: www.schiavuzzi.com
> T: +385989035562
> Skype: danijels7
>

Mime
View raw message