Hi Saurabh,

The tuple emitted by the spout will only be acked once all branches of the tuple tree have been acked, i.e. all 100 tuples are acked.

The error you're seeing was added as part of https://issues.apache.org/jira/browse/STORM-2666 to try to avoid having that bug pop up again. Could you try posting your spout configuration? Also if possible, it would be helpful if you could enable debug logging for org.apache.storm.kafka.spout.KafkaSpout and maybe also org.apache.storm.kafka.spout.internal.OffsetManager. They log when offsets are committed (e.g. https://github.com/apache/storm/blob/v1.2.1/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java#L546), and also when the consumer position is changed (e.g. https://github.com/apache/storm/blob/v1.2.1/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java#L561 ). It should be possible to track down when/why the consumer position wound up behind the committed offset.

Just so you're aware, the check that crashes the spout has been removed as of https://issues.apache.org/jira/browse/STORM-3102. I'd still like to know if there's a bug in the spout causing it to emit tuples that were already committed though.

Den man. 3. dec. 2018 kl. 11.29 skrev saurabh mimani <mimani.saurabh@gmail.com>:

Version Info: 
   "org.apache.storm" % "storm-core" % "1.2.1" 
   "org.apache.storm" % "storm-kafka-client" % "1.2.1" 

I have a storm topology which looks like following:

boltA -> boltB -> boltC -> boltD

boltA just does some formatting of requests and emits another tuple. boltB does some processing and emits around 100 tuples for each tuple being received. boltC and boltD processes these tuples. All the bolts implements BaseBasicBolt.

What I am noticing is whenever boltD marks some tuple as fail and marks that for retry by throwing FailedException, After a few minutes less than my topology timeout, I get the following error:

2018-11-30T20:01:05.261+05:30 util [ERROR] Async loop died!
java.lang.IllegalStateException: Attempting to emit a message that has already been committed. This should never occur when using the at-least-once processing guarantee.
        at org.apache.storm.kafka.spout.KafkaSpout.emitOrRetryTuple(KafkaSpout.java:471) ~[stormjar.jar:?]
        at org.apache.storm.kafka.spout.KafkaSpout.emitIfWaitingNotEmitted(KafkaSpout.java:440) ~[stormjar.jar:?]
        at org.apache.storm.kafka.spout.KafkaSpout.nextTuple(KafkaSpout.java:308) ~[stormjar.jar:?]
        at org.apache.storm.daemon.executor$fn__4975$fn__4990$fn__5021.invoke(executor.clj:654) ~[storm-core-1.2.1.jar:1.2.1]
        at org.apache.storm.util$async_loop$fn__557.invoke(util.clj:484) [storm-core-1.2.1.jar:1.2.1]
        at clojure.lang.AFn.run(AFn.java:22) [clojure-1.7.0.jar:?]
        at java.lang.Thread.run(Thread.java:745) [?:1.8.0_60]
2018-11-30T20:01:05.262+05:30 executor [ERROR]
java.lang.IllegalStateException: Attempting to emit a message that has already been committed. This should never occur when using the at-least-once processing guarantee.
        at org.apache.storm.kafka.spout.KafkaSpout.emitOrRetryTuple(KafkaSpout.java:471) ~[stormjar.jar:?]
        at org.apache.storm.kafka.spout.KafkaSpout.emitIfWaitingNotEmitted(KafkaSpout.java:440) ~[stormjar.jar:?]
        at org.apache.storm.kafka.spout.KafkaSpout.nextTuple(KafkaSpout.java:308) ~[stormjar.jar:?]
        at org.apache.storm.daemon.executor$fn__4975$fn__4990$fn__5021.invoke(executor.clj:654) ~[storm-core-1.2.1.jar:1.2.1]
        at org.apache.storm.util$async_loop$fn__557.invoke(util.clj:484) [storm-core-1.2.1.jar:1.2.1]
        at clojure.lang.AFn.run(AFn.java:22) [clojure-1.7.0.jar:?]
        at java.lang.Thread.run(Thread.java:745) [?:1.8.0_60]

What seems to be happening is this happens when boltB emits 100 out of 1 tuple and boltDfails one of the tuples out of those 100 tuples, I am getting this error. Not able to understand how to fix this, ideally it should ack an original tuple when all 100 tuples are acked, but probably an original tuple is acked before all those 100 tuples are acked, which causes this error.

 


Best Regards

Saurabh Kumar Mimani