Hi ,
Thanks for your notice and I am new to Storm, so my question maybe a little stupid :(.
I run the Trident with "partitionPersist(StateFactory stateFactory, StateUpdater updater)
" :
TridentTopology topology = new TridentTopology();
topology.newStream("emitWord", spout).partitionBy(new Fields("word"))
// .each(new Fields("word"), new C2CFilters.PrintFilter());
//.partitionPersist(jdbcStateFactory, new Fields("word"), new MysqlStateUpdater(),
new Fields("word")).parallelismHint(1);
.partitionPersist(jdbcStateFactory,new JdbcUpdater()).parallelismHint(1);
I noticed the Input tuple to JdbcUpdater#updateState is always null and my process just aborted.
I tried a few hours and finally noticed I have no input fields selector.
So , my question is what is the purpose of such method ?
Thanks very much.
Keith
|