storm-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Edison <xeseo2...@gmail.com>
Subject Re: PartitionedTridentSpoutExecutor might cause lastPartitionMeta always becomes null
Date Mon, 06 Jan 2014 08:42:42 GMT
check this out : https://github.com/wurstmeister/storm-kafka-0.8-plus


2014/1/1 yayj <yayjsir@gmail.com>

> Hi there,
>
> I was going to implement a partitioned trident spout for Kafka based on
> storm 0.9.0.1 recently because storm-contrib/storm-kafka seems not being
> developed and it’s not corresponding to 0.9. But I found my customized
> Emitter implementing storm.trident.spout.IPartitionedTridentSpout.Emitter
> didn’t work.
>
> The reason I found was that lastPartitionMeta parameter of method
> emitPartitionBatchNew was always null in ANY transaction. That’s not
> acceptable since the JavaDoc of Emitter.emitPartitionBatchNew describes it
> “returns the metadata that can be used to reconstruct this partition/batch
> in the future.”(
> https://github.com/nathanmarz/storm/blob/moved-to-apache/storm-core/src/jvm/storm/trident/spout/IPartitionedTridentSpout.java#L52)
> Furthermore, my implementation is just similar to what storm-kafka does(
> https://github.com/nathanmarz/storm-contrib/blob/master/storm-kafka/src/jvm/storm/kafka/trident/KafkaUtils.java#L55
> ).
>
> At last, I found the root cause is located in commit d6c2736(revamp
> trident spout and partitioned trident spouts to support spouts where the
> source can change). In this commit,
> PartitionedTridentSpoutExecutor.Emitter.emitBatch() uses _savedCoordinatorMeta,
> which is returned by Coordinator.getPartitionsForBatch() to check
> partition changing, which is described in commit log. If changed, FIRST
> FLUSHING _partitionStates(line 107: _partitionStates.clear()), which
> stores All partition states, and then invoking
> Emitter.getOrderedPartitions() to regenerate partition information.(
> https://github.com/nathanmarz/storm/blob/moved-to-apache/storm-core/src/jvm/storm/trident/spout/PartitionedTridentSpoutExecutor.java#L105
> )
>
> Frankly speaking, I thinks it’s too arbitrary. The semanteme of the
> operation to _partitionStates becomes REPLACEMENT but I think it should be
> updating, which means adding new ones, removing lost ones, and KEEPING
> STABLE ONES. I don’t think that it’s logical that one partition changes,
> all partitions in the same txid must reconstruct their states.
>
> So, I’m wondering whether my comprehension is correct or not? And is
> it feasible to send a pull request to
> https://github.com/apache/incubator-storm ?
>
>
> -------------
> Matt
>
>
>

Mime
View raw message