storm-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Stig Rohde Døssing <stigdoess...@gmail.com>
Subject Re: Storm using Kafka Spout gives error: IllegalStateException
Date Mon, 03 Dec 2018 15:41:05 GMT
Hi Saurabh,

The tuple emitted by the spout will only be acked once all branches of the
tuple tree have been acked, i.e. all 100 tuples are acked.

The error you're seeing was added as part of
https://issues.apache.org/jira/browse/STORM-2666 to try to avoid having
that bug pop up again. Could you try posting your spout configuration? Also
if possible, it would be helpful if you could enable debug logging for
org.apache.storm.kafka.spout.KafkaSpout
and maybe also org.apache.storm.kafka.spout.internal.OffsetManager. They
log when offsets are committed (e.g.
https://github.com/apache/storm/blob/v1.2.1/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java#L546),
and also when the consumer position is changed (e.g.
https://github.com/apache/storm/blob/v1.2.1/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java#L561
). It should be possible to track down when/why the consumer position wound
up behind the committed offset.

Just so you're aware, the check that crashes the spout has been removed as
of https://issues.apache.org/jira/browse/STORM-3102. I'd still like to know
if there's a bug in the spout causing it to emit tuples that were already
committed though.

Den man. 3. dec. 2018 kl. 11.29 skrev saurabh mimani <
mimani.saurabh@gmail.com>:

>
> Version Info:
>    "org.apache.storm" % "storm-core" % "1.2.1"
>    "org.apache.storm" % "storm-kafka-client" % "1.2.1"
>
> I have a storm topology which looks like following:
>
> boltA -> boltB -> boltC -> boltD
>
> boltA just does some formatting of requests and emits another tuple. boltB does
> some processing and emits around 100 tuples for each tuple being received.
> boltC and boltD processes these tuples. All the bolts implements
> BaseBasicBolt.
>
> What I am noticing is whenever boltD marks some tuple as fail and marks
> that for retry by throwing FailedException, After a few minutes less than
> my topology timeout, I get the following error:
>
> 2018-11-30T20:01:05.261+05:30 util [ERROR] Async loop died!
> java.lang.IllegalStateException: Attempting to emit a message that has already been committed.
This should never occur when using the at-least-once processing guarantee.
>         at org.apache.storm.kafka.spout.KafkaSpout.emitOrRetryTuple(KafkaSpout.java:471)
~[stormjar.jar:?]
>         at org.apache.storm.kafka.spout.KafkaSpout.emitIfWaitingNotEmitted(KafkaSpout.java:440)
~[stormjar.jar:?]
>         at org.apache.storm.kafka.spout.KafkaSpout.nextTuple(KafkaSpout.java:308) ~[stormjar.jar:?]
>         at org.apache.storm.daemon.executor$fn__4975$fn__4990$fn__5021.invoke(executor.clj:654)
~[storm-core-1.2.1.jar:1.2.1]
>         at org.apache.storm.util$async_loop$fn__557.invoke(util.clj:484) [storm-core-1.2.1.jar:1.2.1]
>         at clojure.lang.AFn.run(AFn.java:22) [clojure-1.7.0.jar:?]
>         at java.lang.Thread.run(Thread.java:745) [?:1.8.0_60]
> 2018-11-30T20:01:05.262+05:30 executor [ERROR]
> java.lang.IllegalStateException: Attempting to emit a message that has already been committed.
This should never occur when using the at-least-once processing guarantee.
>         at org.apache.storm.kafka.spout.KafkaSpout.emitOrRetryTuple(KafkaSpout.java:471)
~[stormjar.jar:?]
>         at org.apache.storm.kafka.spout.KafkaSpout.emitIfWaitingNotEmitted(KafkaSpout.java:440)
~[stormjar.jar:?]
>         at org.apache.storm.kafka.spout.KafkaSpout.nextTuple(KafkaSpout.java:308) ~[stormjar.jar:?]
>         at org.apache.storm.daemon.executor$fn__4975$fn__4990$fn__5021.invoke(executor.clj:654)
~[storm-core-1.2.1.jar:1.2.1]
>         at org.apache.storm.util$async_loop$fn__557.invoke(util.clj:484) [storm-core-1.2.1.jar:1.2.1]
>         at clojure.lang.AFn.run(AFn.java:22) [clojure-1.7.0.jar:?]
>         at java.lang.Thread.run(Thread.java:745) [?:1.8.0_60]
>
> What seems to be happening is this happens when boltB emits 100 out of 1
> tuple and boltDfails one of the tuples out of those 100 tuples, I am
> getting this error. Not able to understand how to fix this, ideally it
> should ack an original tuple when all 100 tuples are acked, but probably
> an original tuple is acked before all those 100 tuples are acked, which
> causes this error.
>
>
>
> Best Regards
>
> Saurabh Kumar Mimani
>
>
>

Mime
View raw message