storm-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Matt <yayj...@gmail.com>
Subject Re: PartitionedTridentSpoutExecutor might cause lastPartitionMeta always becomes null
Date Mon, 13 Jan 2014 08:16:03 GMT
Hi Edison,

Yes, I have already resolved these problems. And thanks a lot for your help.

-------------
Matt


在 2014年1月13日,下午4:08,Edison <xeseo2005@gmail.com> 写道:

> Hi Matt,
> 
> For the serialization problem, I feel the same with you. They could use Kryo for a better
code reuse. I guess this func is "just work", and the contributors didn't hear any strong
voice, so they just leave it there at a lower priority?
> 
> BTW, in Trident, the Emitter interface requires a implementation of "void refreshPartitions(List<Partition>
partitionResponsibilities);", the comments saying this is used to manage things like connections
to brokers. 
> In the implementation of TridentKafkaEmitter, of the project I shared to you, it closes
the connection to Kafka broker everytime when refreshPartition method is called. So in the
serverlog of Kafka, lots of junk information saying socket close keeps coming.
> 
> In the code OpaquePartitionedTridentSpoutExecutor.emitBatch method, clearly, storm will
try to "refresh" the partition everytime when it update coordinate meta, but why does it have
to do this? Avoiding connection lost or some?
> Do you get any clue? 
> 
> 
> 2014/1/13 Matt <yayjsir@gmail.com>
> HI Edison,
> 
> Thanks for your reply. It does help I think because it does work. But I am still curious
about why, because _partitionStates is also been flushed when using storm-kakfk-plus.
> 
> At last, I realized that the root cause was not _partitionStates flushed. The real reason
is that I used my customized class to interpret lastPartitionMeta. My spout is implementing
IOpaquePartitionedTridentSpout<Partitions, Partition, Batch>, while storm-kafka-0.8-plus
is implementing IOpaquePartitionedTridentSpout<Partitions, Partition, Map>.
> 
> The last template argument(Batch/Map) will be serialized and stored in zookeeper. Storm
will read last batch information from zookeeper if it’s not stored locally. But storm fulfills
the serialization for this instance by using json-simple hardcoded(https://github.com/nathanmarz/storm/blob/0.9.0.1/storm-core/src/jvm/storm/trident/topology/state/TransactionalState.java#L52)
and json-simple doesn’t support reflection, which means not supporting customized class.
> 
> In other words, storm implicitly requires that batch information must be Map, String
or some others supported by json-simple. But no document mentioned that.
> 
> And why not use Kryo to serialize/deserialize batch information, which is widely used
in storm already?
> 
> -------------
> Matt
> 
> 
> 在 2014年1月6日,下午4:42,Edison <xeseo2005@gmail.com> 写道:
> 
>> check this out : https://github.com/wurstmeister/storm-kafka-0.8-plus
>> 
>> 
>> 2014/1/1 yayj <yayjsir@gmail.com>
>> Hi there,
>> 
>> I was going to implement a partitioned trident spout for Kafka based on storm 0.9.0.1
recently because storm-contrib/storm-kafka seems not being developed and it’s not corresponding
to 0.9. But I found my customized Emitter implementing storm.trident.spout.IPartitionedTridentSpout.Emitter
didn’t work.
>> 
>> The reason I found was that lastPartitionMeta parameter of method emitPartitionBatchNew
was always null in ANY transaction. That’s not acceptable since the JavaDoc of Emitter.emitPartitionBatchNew
describes it “returns the metadata that can be used to reconstruct this partition/batch
in the future.”(https://github.com/nathanmarz/storm/blob/moved-to-apache/storm-core/src/jvm/storm/trident/spout/IPartitionedTridentSpout.java#L52)
Furthermore, my implementation is just similar to what storm-kafka does(https://github.com/nathanmarz/storm-contrib/blob/master/storm-kafka/src/jvm/storm/kafka/trident/KafkaUtils.java#L55).
>> 
>> At last, I found the root cause is located in commit d6c2736(revamp trident spout
and partitioned trident spouts to support spouts where the source can change). In this commit,
PartitionedTridentSpoutExecutor.Emitter.emitBatch() uses _savedCoordinatorMeta, which is returned
by Coordinator.getPartitionsForBatch() to check partition changing, which is described in
commit log. If changed, FIRST FLUSHING _partitionStates(line 107: _partitionStates.clear()),
which stores All partition states, and then invoking Emitter.getOrderedPartitions() to regenerate
partition information.(https://github.com/nathanmarz/storm/blob/moved-to-apache/storm-core/src/jvm/storm/trident/spout/PartitionedTridentSpoutExecutor.java#L105)
>> 
>> Frankly speaking, I thinks it’s too arbitrary. The semanteme of the operation to
_partitionStates becomes REPLACEMENT but I think it should be updating, which means adding
new ones, removing lost ones, and KEEPING STABLE ONES. I don’t think that it’s logical
that one partition changes, all partitions in the same txid must reconstruct their states.
>> 
>> So, I’m wondering whether my comprehension is correct or not? And is it feasible
to send a pull request to https://github.com/apache/incubator-storm ?
>> 
>> 
>> -------------
>> Matt
>> 
>> 
>> 
> 
> 


Mime
View raw message