Thanks Danijel for your quick proposition.
I tried lowering down and removing all performance settings (those were
left from load testing on one machine)
Still same result: no matter what, new messages are not taken from kafka
after topology is redeployed.
On Tue, Jul 8, 2014 at 6:15 PM, Danijel Schiavuzzi <danijel@schiavuzzi.com>
wrote:
> Try lowering setMaxSpoutPending(100000) to a much lower value (like 10).
> In Trident, setMaxSpoutPending referns to the number of batches, not tuples
> like in plain Storm. Too high values may cause blockages like the one you
> describe.
>
>
> On Tuesday, July 8, 2014, Miloš Solujić <milos.solujic@gmail.com> wrote:
>
>> Hi all,
>>
>> I'm pretty new to storm and kafka/zookeeper, and I hope that my question
>> is not to dumb. Here it goes:
>>
>> I'm using latest stable storm and storm-kafka = 0.9.2-incubating
>>
>> I've setup test cluster using wirbelsturm tool with unchanged yaml (just
>> uncommented kafka machine)
>>
>> here is config snippet for my trident topology:
>>
>> BrokerHosts zk = new ZkHosts("zookeeper1");
>> TridentKafkaConfig kafkaConf = new TridentKafkaConfig(zk,
>> "scores");
>>
>> kafkaConf.scheme = new SchemeAsMultiScheme(new StringScheme());
>> kafkaConf.fetchSizeBytes = 10000;
>> kafkaConf.forceFromStart = true;
>>
>> Config stormConfig = new Config();
>> stormConfig.put(Config.NIMBUS_HOST, NIMBUS_IP);
>> stormConfig.put("couchbase.ip", COUCHBASE_CONSOLE);
>> stormConfig.put("couchbase.bucket", COUCHBASE_BUCKET);
>> stormConfig.put("couchbase.password", COUCHBASE_PASSWORD);
>> // performance settings
>>
>> stormConfig.put(Config.TOPOLOGY_TRIDENT_BATCH_EMIT_INTERVAL_MILLIS, 100);
>> stormConfig.put(RichSpoutBatchExecutor.MAX_BATCH_SIZE_CONF, 100);
>> stormConfig.setMaxSpoutPending(100000);
>>
>>
>> if (args != null && args.length > 0) {
>>
>> StormSubmitter.submitTopologyWithProgressBar(args[0],
>> stormConfig,
>> BuildTridentScoreTopology.build(kafkaConf));
>> } else {...}
>>
>> Now, I've created 'scores' topic in kafka and pushed few test messages
>> prior to starting topology, with kafkaConf.forceFromStart = true. And
>> topology processed those messages just fine, and stored them in
>> tridentState (couhbase)
>>
>> All new messages are simply ignored!
>>
>> After redeploying topology (both with forceFromStart = true and
>> forceFromStart = false) no more messages are ingested from kafka.
>>
>> here is worker log for one topology deployment and short run
>> http://pastie.org/private/4xsk6pijvmulwrcg7zgca
>>
>> those are VMs that host this storm cluster
>> 10.0.0.241 zookeeper1
>> 10.0.0.101 supervisor1
>> 10.0.0.21 kafka1
>> 10.0.0.251 nimbus1
>>
>> Thanks,
>> Milos
>>
>>
>>
>>
>>
>>
>>
>>
>
> --
> Danijel Schiavuzzi
>
> E: danijel@schiavuzzi.com
> W: www.schiavuzzi.com
> T: +385989035562
> Skype: danijels7
>
|