storm-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From yayj <yayj...@gmail.com>
Subject PartitionedTridentSpoutExecutor might cause lastPartitionMeta always becomes null
Date Wed, 01 Jan 2014 09:20:22 GMT
Hi there,

I was going to implement a partitioned trident spout for Kafka based on storm 0.9.0.1 recently
because storm-contrib/storm-kafka seems not being developed and it’s not corresponding to
0.9. But I found my customized Emitter implementing storm.trident.spout.IPartitionedTridentSpout.Emitter
didn’t work.

The reason I found was that lastPartitionMeta parameter of method emitPartitionBatchNew was
always null in ANY transaction. That’s not acceptable since the JavaDoc of Emitter.emitPartitionBatchNew
describes it “returns the metadata that can be used to reconstruct this partition/batch
in the future.”(https://github.com/nathanmarz/storm/blob/moved-to-apache/storm-core/src/jvm/storm/trident/spout/IPartitionedTridentSpout.java#L52)
Furthermore, my implementation is just similar to what storm-kafka does(https://github.com/nathanmarz/storm-contrib/blob/master/storm-kafka/src/jvm/storm/kafka/trident/KafkaUtils.java#L55).

At last, I found the root cause is located in commit d6c2736(revamp trident spout and partitioned
trident spouts to support spouts where the source can change). In this commit, PartitionedTridentSpoutExecutor.Emitter.emitBatch()
uses _savedCoordinatorMeta, which is returned by Coordinator.getPartitionsForBatch() to check
partition changing, which is described in commit log. If changed, FIRST FLUSHING _partitionStates(line
107: _partitionStates.clear()), which stores All partition states, and then invoking Emitter.getOrderedPartitions()
to regenerate partition information.(https://github.com/nathanmarz/storm/blob/moved-to-apache/storm-core/src/jvm/storm/trident/spout/PartitionedTridentSpoutExecutor.java#L105)

Frankly speaking, I thinks it’s too arbitrary. The semanteme of the operation to _partitionStates
becomes REPLACEMENT but I think it should be updating, which means adding new ones, removing
lost ones, and KEEPING STABLE ONES. I don’t think that it’s logical that one partition
changes, all partitions in the same txid must reconstruct their states.

So, I’m wondering whether my comprehension is correct or not? And is it feasible to send
a pull request to https://github.com/apache/incubator-storm ?


-------------
Matt



Mime
View raw message