storm-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From saurabh mimani <mimani.saur...@gmail.com>
Subject Storm using Kafka Spout gives error: IllegalStateException
Date Mon, 03 Dec 2018 10:29:13 GMT
Version Info:
   "org.apache.storm" % "storm-core" % "1.2.1"
   "org.apache.storm" % "storm-kafka-client" % "1.2.1"

I have a storm topology which looks like following:

boltA -> boltB -> boltC -> boltD

boltA just does some formatting of requests and emits another tuple. boltB does
some processing and emits around 100 tuples for each tuple being received.
boltC and boltD processes these tuples. All the bolts implements
BaseBasicBolt.

What I am noticing is whenever boltD marks some tuple as fail and marks
that for retry by throwing FailedException, After a few minutes less than
my topology timeout, I get the following error:

2018-11-30T20:01:05.261+05:30 util [ERROR] Async loop died!
java.lang.IllegalStateException: Attempting to emit a message that has
already been committed. This should never occur when using the
at-least-once processing guarantee.
        at org.apache.storm.kafka.spout.KafkaSpout.emitOrRetryTuple(KafkaSpout.java:471)
~[stormjar.jar:?]
        at org.apache.storm.kafka.spout.KafkaSpout.emitIfWaitingNotEmitted(KafkaSpout.java:440)
~[stormjar.jar:?]
        at org.apache.storm.kafka.spout.KafkaSpout.nextTuple(KafkaSpout.java:308)
~[stormjar.jar:?]
        at org.apache.storm.daemon.executor$fn__4975$fn__4990$fn__5021.invoke(executor.clj:654)
~[storm-core-1.2.1.jar:1.2.1]
        at org.apache.storm.util$async_loop$fn__557.invoke(util.clj:484)
[storm-core-1.2.1.jar:1.2.1]
        at clojure.lang.AFn.run(AFn.java:22) [clojure-1.7.0.jar:?]
        at java.lang.Thread.run(Thread.java:745) [?:1.8.0_60]
2018-11-30T20:01:05.262+05:30 executor [ERROR]
java.lang.IllegalStateException: Attempting to emit a message that has
already been committed. This should never occur when using the
at-least-once processing guarantee.
        at org.apache.storm.kafka.spout.KafkaSpout.emitOrRetryTuple(KafkaSpout.java:471)
~[stormjar.jar:?]
        at org.apache.storm.kafka.spout.KafkaSpout.emitIfWaitingNotEmitted(KafkaSpout.java:440)
~[stormjar.jar:?]
        at org.apache.storm.kafka.spout.KafkaSpout.nextTuple(KafkaSpout.java:308)
~[stormjar.jar:?]
        at org.apache.storm.daemon.executor$fn__4975$fn__4990$fn__5021.invoke(executor.clj:654)
~[storm-core-1.2.1.jar:1.2.1]
        at org.apache.storm.util$async_loop$fn__557.invoke(util.clj:484)
[storm-core-1.2.1.jar:1.2.1]
        at clojure.lang.AFn.run(AFn.java:22) [clojure-1.7.0.jar:?]
        at java.lang.Thread.run(Thread.java:745) [?:1.8.0_60]

What seems to be happening is this happens when boltB emits 100 out of 1
tuple and boltDfails one of the tuples out of those 100 tuples, I am
getting this error. Not able to understand how to fix this, ideally it
should ack an original tuple when all 100 tuples are acked, but probably an
original tuple is acked before all those 100 tuples are acked, which causes
this error.



Best Regards

Saurabh Kumar Mimani

Mime
View raw message