storm-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Danijel Schiavuzzi <dani...@schiavuzzi.com>
Subject Re: Kafka trident getting stuck
Date Wed, 09 Jul 2014 19:49:17 GMT
Very strange. Could you try deleting Trident's data in Zookeeper:

$ sh zkCli.sh
rmr /transactional

and then resubmitting the topology and repeating your test scenario?

Maybe the the spout's data in Zookeeper got somehow corrupted because you
are setting forceFromStart in the spout, and resubmitting the topology
multiple times. I think the transactional topology may be left in an
undefined state that case.

You could also enable the LoggingMetricsConsumer in storm.yaml, and then
check the Kafka spout's kafka.latestOffset metric in metrics.log, and
compare this offset with the one Kafka's own utility script outputs
(search under kafka/bin/ for the script).

On Wednesday, July 9, 2014, Miloš Solujić <milos.solujic@gmail.com> wrote:

> Yep, I did double checked.
>
> Here is how it's done:
>
> #create topic
> /opt/kafka/bin/kafka-topics.sh --create --zookeeper zookeeper1:2181
> --replication-factor 1 --partition 1 --topic scores
>
> #check what is created
> /opt/kafka/bin/kafka-topics.sh --zookeeper zookeeper1:2181 --describe
> --topic scores
>
> #produce few messages
> /opt/kafka/bin/kafka-console-producer.sh --broker-list localhost:9092
> --topic scores
>
> #consumer
> /opt/kafka/bin/kafka-console-consumer.sh --zookeeper zookeeper1:2181
> --topic scores --from-beginning
>
>
>
>
> > try enabling Config.setDebug(true) and monitoring the Kafka spout's
> activity in the logs.
> did that, only tick touples are shipped around, nothing else
>
> > Also, you should paste all your worker logs (worker-*.log files).
> Forgot to mention, only one worker is set, exactly for reason to simplify
> things.
>
>
>
> Here is simplified version of this topology (no trident state, only simple
> printer bolt)
>
>
> public class TridentKafkaDeployer {
>
>     public static class PrinterBolt extends BaseFunction {
>         private static final long serialVersionUID = -5585127152942983256L;
>
>         @Override
>         public void execute(TridentTuple tuple, TridentCollector
> tridentCollector) {
>             System.out.println(tuple.toString());
>         }
>     }
>
>     public static void main(String[] args) throws Exception {
>
>         BrokerHosts zk = new ZkHosts("zookeeper1");
>         TridentKafkaConfig kafkaConfig = new TridentKafkaConfig(zk,
> "scores");
>
>         kafkaConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
>         kafkaConfig.forceFromStart = true;
>
>         TridentTopology topology = new TridentTopology();
>
>         topology
>             .newStream("raw-scores", new
> TransactionalTridentKafkaSpout(kafkaConfig))
>                     .name("kafkaSpout")
>             .each(new Fields("str"), new PrinterBolt(),
>                     new Fields("print"));
>
>
>         Config config = new Config();
>         config.put(Config.TOPOLOGY_TRIDENT_BATCH_EMIT_INTERVAL_MILLIS,
> 2000);
>         config.setNumWorkers(1);
>         config.setMaxSpoutPending(3);
>
>         StormSubmitter.submitTopologyWithProgressBar(args[0], config,
> topology.build());
>     }
> }
>
> Exactly same behaviour (it goes to exactly same kafka topic) = no picking
> up fresh messages in kafka topic.
>
>
>
>
>
> On Tue, Jul 8, 2014 at 7:08 PM, Danijel Schiavuzzi <danijel@schiavuzzi.com
> <javascript:_e(%7B%7D,'cvml','danijel@schiavuzzi.com');>> wrote:
>
>> Also, you should paste all your worker logs (worker-*.log files).
>>
>>
>> On Tuesday, July 8, 2014, Danijel Schiavuzzi <danijel@schiavuzzi.com
>> <javascript:_e(%7B%7D,'cvml','danijel@schiavuzzi.com');>> wrote:
>>
>>> I'd double check the Kafka producer to make sure those messages are
>>> really getting into the right Kafka topic. Also,
>>> try enabling Config.setDebug(true) and monitoring the Kafka spout's
>>> activity in the logs. setMaxSpoutPending should always be set, as by
>>> default it is unset, so you risk internal queue explosion.
>>>
>>> On Tuesday, July 8, 2014, Miloš Solujić <milos.solujic@gmail.com> wrote:
>>>
>>>> Yep. pretty much sure. Via internal kafka-producer.sh
>>>> same method is used to produce initial messages (before first launch of
>>>> topology, that got consumed and processed just fine)
>>>>
>>>>  as for maxSpoutPending first I tried with 10, than removed it (left
>>>> default value)
>>>>
>>>>
>>>> On Tue, Jul 8, 2014 at 6:31 PM, Danijel Schiavuzzi <
>>>> danijel@schiavuzzi.com> wrote:
>>>>
>>>>> Are you sure you are producing new messages into the same Kafka
>>>>> topic? What number did you set maxSpoutPending to?
>>>>>
>>>>> On Tuesday, July 8, 2014, Miloš Solujić <milos.solujic@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Thanks Danijel for your quick proposition.
>>>>>>
>>>>>> I tried lowering down and removing all performance settings (those
>>>>>> were left from load testing on one machine)
>>>>>>
>>>>>> Still same result: no matter what, new messages are not taken from
>>>>>> kafka after topology is redeployed.
>>>>>>
>>>>>>
>>>>>> On Tue, Jul 8, 2014 at 6:15 PM, Danijel Schiavuzzi <
>>>>>> danijel@schiavuzzi.com> wrote:
>>>>>>
>>>>>>> Try lowering setMaxSpoutPending(100000) to a much lower value
(like
>>>>>>> 10). In Trident, setMaxSpoutPending referns to the number of
batches, not
>>>>>>> tuples like in plain Storm. Too high values may cause blockages
like the
>>>>>>> one you describe.
>>>>>>>
>>>>>>>
>>>>>>> On Tuesday, July 8, 2014, Miloš Solujić <milos.solujic@gmail.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Hi all,
>>>>>>>>
>>>>>>>> I'm pretty new to storm and kafka/zookeeper, and I hope that
my
>>>>>>>> question is not to dumb. Here it goes:
>>>>>>>>
>>>>>>>> I'm using latest stable storm and storm-kafka = 0.9.2-incubating
>>>>>>>>
>>>>>>>> I've setup test cluster using wirbelsturm tool with unchanged
yaml
>>>>>>>> (just uncommented kafka machine)
>>>>>>>>
>>>>>>>> here is config snippet for my trident topology:
>>>>>>>>
>>>>>>>>         BrokerHosts zk = new ZkHosts("zookeeper1");
>>>>>>>>         TridentKafkaConfig kafkaConf = new TridentKafkaConfig(zk,
>>>>>>>> "scores");
>>>>>>>>
>>>>>>>>         kafkaConf.scheme = new SchemeAsMultiScheme(new
>>>>>>>> StringScheme());
>>>>>>>>         kafkaConf.fetchSizeBytes = 10000;
>>>>>>>>         kafkaConf.forceFromStart = true;
>>>>>>>>
>>>>>>>>         Config stormConfig = new Config();
>>>>>>>>         stormConfig.put(Config.NIMBUS_HOST, NIMBUS_IP);
>>>>>>>>         stormConfig.put("couchbase.ip", COUCHBASE_CONSOLE);
>>>>>>>>         stormConfig.put("couchbase.bucket", COUCHBASE_BUCKET);
>>>>>>>>         stormConfig.put("couchbase.password", COUCHBASE_PASSWORD);
>>>>>>>>         // performance settings
>>>>>>>>
>>>>>>>> stormConfig.put(Config.TOPOLOGY_TRIDENT_BATCH_EMIT_INTERVAL_MILLIS,
100);
>>>>>>>>         stormConfig.put(RichSpoutBatchExecutor.MAX_BATCH_SIZE_CONF,
>>>>>>>> 100);
>>>>>>>>         stormConfig.setMaxSpoutPending(100000);
>>>>>>>>
>>>>>>>>
>>>>>>>>         if (args != null && args.length > 0) {
>>>>>>>>
>>>>>>>>             StormSubmitter.submitTopologyWithProgressBar(args[0],
>>>>>>>> stormConfig,
>>>>>>>>                     BuildTridentScoreTopology.build(kafkaConf));
>>>>>>>>         } else {...}
>>>>>>>>
>>>>>>>> Now, I've created 'scores' topic in kafka and pushed few
test
>>>>>>>> messages prior to starting topology, with kafkaConf.forceFromStart
= true.
>>>>>>>> And topology processed those messages just fine, and stored
them in
>>>>>>>> tridentState (couhbase)
>>>>>>>>
>>>>>>>> All new messages are simply ignored!
>>>>>>>>
>>>>>>>> After redeploying topology (both with forceFromStart = true
and
>>>>>>>> forceFromStart = false) no more messages are ingested from
kafka.
>>>>>>>>
>>>>>>>> here is worker log for one topology deployment and short
run
>>>>>>>> http://pastie.org/private/4xsk6pijvmulwrcg7zgca
>>>>>>>>
>>>>>>>> those are VMs that host this storm cluster
>>>>>>>> 10.0.0.241 zookeeper1
>>>>>>>> 10.0.0.101 supervisor1
>>>>>>>> 10.0.0.21 kafka1
>>>>>>>> 10.0.0.251 nimbus1
>>>>>>>>
>>>>>>>> Thanks,
>>>>>>>> Milos
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>> --
>>>>>>> Danijel Schiavuzzi
>>>>>>>
>>>>>>> E: danijel@schiavuzzi.com
>>>>>>> W: www.schiavuzzi.com
>>>>>>> T: +385989035562
>>>>>>> Skype: danijels7
>>>>>>>
>>>>>>
>>>>>>
>>>>>
>>>>> --
>>>>> Danijel Schiavuzzi
>>>>>
>>>>> E: danijel@schiavuzzi.com
>>>>> W: www.schiavuzzi.com
>>>>> T: +385989035562
>>>>> Skype: danijels7
>>>>>
>>>>
>>>>
>>>
>>> --
>>> Danijel Schiavuzzi
>>>
>>> E: danijel@schiavuzzi.com
>>> W: www.schiavuzzi.com
>>> T: +385989035562
>>> Skype: danijels7
>>>
>>
>>
>> --
>> Danijel Schiavuzzi
>>
>> E: danijel@schiavuzzi.com
>> <javascript:_e(%7B%7D,'cvml','danijel@schiavuzzi.com');>
>> W: www.schiavuzzi.com
>> T: +385989035562
>> Skype: danijels7
>>
>
>

-- 
Danijel Schiavuzzi

E: danijel@schiavuzzi.com
W: www.schiavuzzi.com
T: +385989035562
Skype: danijels7

Mime
View raw message