Hi Edison,

Sounds good, and could you please publish your pull request here?

-------------
Matt


在 2014年1月22日,下午4:06,Edison <xeseo2005@gmail.com> 写道:

I got the refreshing issue solved. It's caused by a bug that GlobalPartitionInformation didn't override the equal() method of Object. 
Thus, everytime Trident will try to check whether the partition infor has changed or not by invoking equal() method, it will refresh the partition.
I've submited a pull request to the author, and likely it may be merged into master soon.


2014/1/13 Matt <yayjsir@gmail.com>
Hi Edison,

Yes, I have already resolved these problems. And thanks a lot for your help.

-------------
Matt


在 2014年1月13日,下午4:08,Edison <xeseo2005@gmail.com> 写道:

Hi Matt,

For the serialization problem, I feel the same with you. They could use Kryo for a better code reuse. I guess this func is "just work", and the contributors didn't hear any strong voice, so they just leave it there at a lower priority?

BTW, in Trident, the Emitter interface requires a implementation of "void refreshPartitions(List<Partition> partitionResponsibilities);", the comments saying this is used to manage things like connections to brokers. 
In the implementation of TridentKafkaEmitter, of the project I shared to you, it closes the connection to Kafka broker everytime when refreshPartition method is called. So in the serverlog of Kafka, lots of junk information saying socket close keeps coming.

In the code OpaquePartitionedTridentSpoutExecutor.emitBatch method, clearly, storm will try to "refresh" the partition everytime when it update coordinate meta, but why does it have to do this? Avoiding connection lost or some?
Do you get any clue? 


2014/1/13 Matt <yayjsir@gmail.com>
HI Edison,

Thanks for your reply. It does help I think because it does work. But I am still curious about why, because _partitionStates is also been flushed when using storm-kakfk-plus.

At last, I realized that the root cause was not _partitionStates flushed. The real reason is that I used my customized class to interpret lastPartitionMeta. My spout is implementing IOpaquePartitionedTridentSpout<Partitions, Partition, Batch>, while storm-kafka-0.8-plus is implementing IOpaquePartitionedTridentSpout<Partitions, Partition, Map>.

The last template argument(Batch/Map) will be serialized and stored in zookeeper. Storm will read last batch information from zookeeper if it’s not stored locally. But storm fulfills the serialization for this instance by using json-simple hardcoded(https://github.com/nathanmarz/storm/blob/0.9.0.1/storm-core/src/jvm/storm/trident/topology/state/TransactionalState.java#L52) and json-simple doesn’t support reflection, which means not supporting customized class.

In other words, storm implicitly requires that batch information must be Map, String or some others supported by json-simple. But no document mentioned that.

And why not use Kryo to serialize/deserialize batch information, which is widely used in storm already?

-------------
Matt


在 2014年1月6日,下午4:42,Edison <xeseo2005@gmail.com> 写道:



2014/1/1 yayj <yayjsir@gmail.com>
Hi there,

I was going to implement a partitioned trident spout for Kafka based on storm 0.9.0.1 recently because storm-contrib/storm-kafka seems not being developed and it’s not corresponding to 0.9. But I found my customized Emitter implementing storm.trident.spout.IPartitionedTridentSpout.Emitter didn’t work.

The reason I found was that lastPartitionMeta parameter of method emitPartitionBatchNew was always null in ANY transaction. That’s not acceptable since the JavaDoc of Emitter.emitPartitionBatchNew describes it “returns the metadata that can be used to reconstruct this partition/batch in the future.”(https://github.com/nathanmarz/storm/blob/moved-to-apache/storm-core/src/jvm/storm/trident/spout/IPartitionedTridentSpout.java#L52) Furthermore, my implementation is just similar to what storm-kafka does(https://github.com/nathanmarz/storm-contrib/blob/master/storm-kafka/src/jvm/storm/kafka/trident/KafkaUtils.java#L55).

At last, I found the root cause is located in commit d6c2736(revamp trident spout and partitioned trident spouts to support spouts where the source can change). In this commit, PartitionedTridentSpoutExecutor.Emitter.emitBatch() uses _savedCoordinatorMeta, which is returned by Coordinator.getPartitionsForBatch() to check partition changing, which is described in commit log. If changed, FIRST FLUSHING _partitionStates(line 107: _partitionStates.clear()), which stores All partition states, and then invoking Emitter.getOrderedPartitions() to regenerate partition information.(https://github.com/nathanmarz/storm/blob/moved-to-apache/storm-core/src/jvm/storm/trident/spout/PartitionedTridentSpoutExecutor.java#L105)

Frankly speaking, I thinks it’s too arbitrary. The semanteme of the operation to _partitionStates becomes REPLACEMENT but I think it should be updating, which means adding new ones, removing lost ones, and KEEPING STABLE ONES. I don’t think that it’s logical that one partition changes, all partitions in the same txid must reconstruct their states.

So, I’m wondering whether my comprehension is correct or not? And is it feasible to send a pull request to https://github.com/apache/incubator-storm ?


-------------
Matt