storm-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Matt <yayj...@gmail.com>
Subject Re: PartitionedTridentSpoutExecutor might cause lastPartitionMeta always becomes null
Date Wed, 22 Jan 2014 14:30:32 GMT
Hi Edison,

Sounds good, and could you please publish your pull request here?

-------------
Matt


在 2014年1月22日,下午4:06,Edison <xeseo2005@gmail.com> 写道:

> I got the refreshing issue solved. It's caused by a bug that GlobalPartitionInformation
didn't override the equal() method of Object. 
> Thus, everytime Trident will try to check whether the partition infor has changed or
not by invoking equal() method, it will refresh the partition.
> I've submited a pull request to the author, and likely it may be merged into master soon.
> 
> 
> 2014/1/13 Matt <yayjsir@gmail.com>
> Hi Edison,
> 
> Yes, I have already resolved these problems. And thanks a lot for your help.
> 
> -------------
> Matt
> 
> 
> 在 2014年1月13日,下午4:08,Edison <xeseo2005@gmail.com> 写道:
> 
>> Hi Matt,
>> 
>> For the serialization problem, I feel the same with you. They could use Kryo for
a better code reuse. I guess this func is "just work", and the contributors didn't hear any
strong voice, so they just leave it there at a lower priority?
>> 
>> BTW, in Trident, the Emitter interface requires a implementation of "void refreshPartitions(List<Partition>
partitionResponsibilities);", the comments saying this is used to manage things like connections
to brokers. 
>> In the implementation of TridentKafkaEmitter, of the project I shared to you, it
closes the connection to Kafka broker everytime when refreshPartition method is called. So
in the serverlog of Kafka, lots of junk information saying socket close keeps coming.
>> 
>> In the code OpaquePartitionedTridentSpoutExecutor.emitBatch method, clearly, storm
will try to "refresh" the partition everytime when it update coordinate meta, but why does
it have to do this? Avoiding connection lost or some?
>> Do you get any clue? 
>> 
>> 
>> 2014/1/13 Matt <yayjsir@gmail.com>
>> HI Edison,
>> 
>> Thanks for your reply. It does help I think because it does work. But I am still
curious about why, because _partitionStates is also been flushed when using storm-kakfk-plus.
>> 
>> At last, I realized that the root cause was not _partitionStates flushed. The real
reason is that I used my customized class to interpret lastPartitionMeta. My spout is implementing
IOpaquePartitionedTridentSpout<Partitions, Partition, Batch>, while storm-kafka-0.8-plus
is implementing IOpaquePartitionedTridentSpout<Partitions, Partition, Map>.
>> 
>> The last template argument(Batch/Map) will be serialized and stored in zookeeper.
Storm will read last batch information from zookeeper if it’s not stored locally. But storm
fulfills the serialization for this instance by using json-simple hardcoded(https://github.com/nathanmarz/storm/blob/0.9.0.1/storm-core/src/jvm/storm/trident/topology/state/TransactionalState.java#L52)
and json-simple doesn’t support reflection, which means not supporting customized class.
>> 
>> In other words, storm implicitly requires that batch information must be Map, String
or some others supported by json-simple. But no document mentioned that.
>> 
>> And why not use Kryo to serialize/deserialize batch information, which is widely
used in storm already?
>> 
>> -------------
>> Matt
>> 
>> 
>> 在 2014年1月6日,下午4:42,Edison <xeseo2005@gmail.com> 写道:
>> 
>>> check this out : https://github.com/wurstmeister/storm-kafka-0.8-plus
>>> 
>>> 
>>> 2014/1/1 yayj <yayjsir@gmail.com>
>>> Hi there,
>>> 
>>> I was going to implement a partitioned trident spout for Kafka based on storm
0.9.0.1 recently because storm-contrib/storm-kafka seems not being developed and it’s not
corresponding to 0.9. But I found my customized Emitter implementing storm.trident.spout.IPartitionedTridentSpout.Emitter
didn’t work.
>>> 
>>> The reason I found was that lastPartitionMeta parameter of method emitPartitionBatchNew
was always null in ANY transaction. That’s not acceptable since the JavaDoc of Emitter.emitPartitionBatchNew
describes it “returns the metadata that can be used to reconstruct this partition/batch
in the future.”(https://github.com/nathanmarz/storm/blob/moved-to-apache/storm-core/src/jvm/storm/trident/spout/IPartitionedTridentSpout.java#L52)
Furthermore, my implementation is just similar to what storm-kafka does(https://github.com/nathanmarz/storm-contrib/blob/master/storm-kafka/src/jvm/storm/kafka/trident/KafkaUtils.java#L55).
>>> 
>>> At last, I found the root cause is located in commit d6c2736(revamp trident spout
and partitioned trident spouts to support spouts where the source can change). In this commit,
PartitionedTridentSpoutExecutor.Emitter.emitBatch() uses _savedCoordinatorMeta, which is returned
by Coordinator.getPartitionsForBatch() to check partition changing, which is described in
commit log. If changed, FIRST FLUSHING _partitionStates(line 107: _partitionStates.clear()),
which stores All partition states, and then invoking Emitter.getOrderedPartitions() to regenerate
partition information.(https://github.com/nathanmarz/storm/blob/moved-to-apache/storm-core/src/jvm/storm/trident/spout/PartitionedTridentSpoutExecutor.java#L105)
>>> 
>>> Frankly speaking, I thinks it’s too arbitrary. The semanteme of the operation
to _partitionStates becomes REPLACEMENT but I think it should be updating, which means adding
new ones, removing lost ones, and KEEPING STABLE ONES. I don’t think that it’s logical
that one partition changes, all partitions in the same txid must reconstruct their states.
>>> 
>>> So, I’m wondering whether my comprehension is correct or not? And is it feasible
to send a pull request to https://github.com/apache/incubator-storm ?
>>> 
>>> 
>>> -------------
>>> Matt
>>> 
>>> 
>>> 
>> 
>> 
> 
> 


Mime
View raw message