storm-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Ravi Sharma <ping2r...@gmail.com>
Subject Re: Storm using Kafka Spout gives error: IllegalStateException
Date Wed, 05 Dec 2018 07:22:14 GMT
Hi Saurabh,
I think there is issue with part of code which is emitting the tuples.

If you want to use ack mechanism, you need to use anchor tuple when you
emit from bolts.

Collector.emit(Tuple input, Values data)

Also make sure Kafka spout emits tuple with a unique id.

Thanks
Ravi


On Wed, 5 Dec 2018, 06:35 saurabh mimani <mimani.saurabh@gmail.com wrote:

> Hey,
>
> Thanks for your reply. What you are saying is correct, However I am able
> to reproduce it more often and I think it happens when multiple tuples gets
> failed in first run but all of those gets success on retry, something of
> that sort is happening.
>
> This can be reproduced using following two bolts and kafkaSpout easily, by
> running in cluster more with 3/4 minutes:
>
> *BoltA*
>
> case class Abc(index: Int, rand: Boolean)
>
> class BoltA  extends BaseBasicBolt {
>
>   override def execute(input: Tuple, collector: BasicOutputCollector): Unit = {
>     val inp = input.getBinaryByField("value").getObj[someObj]
>     val randomGenerator = new Random()
>
>     var i = 0
>     val rand = randomGenerator.nextBoolean()
>     1 to 100 foreach {
>       collector.emit(new Values(Abc(i, rand).getJsonBytes))
>       i += 1
>     }
>   }
>
>   override def declareOutputFields(declarer: OutputFieldsDeclarer): Unit = {
>     declarer.declare(new Fields("boltAout"))
>   }
>
> }
>
> *BoltB*
>
> class BoltB  extends BaseBasicBolt {
>
>   override def execute(input: Tuple, collector: BasicOutputCollector): Unit = {
>     val abc = input.getBinaryByField("boltAout").getObj[Abc]
>     println(s"Received ${abc.index}th tuple in BoltB")
>     if(abc.index >= 97 && abc.rand){
>       println(s"throwing FailedException for ${abc.index}th tuple for")
>       throw new FailedException()
>     }
>   }
>
>   override def declareOutputFields(declarer: OutputFieldsDeclarer): Unit = {
>   }
> }
>
> *KafkaSpout:*
>
> private def getKafkaSpoutConfig(source: Config) = KafkaSpoutConfig.builder("connections.kafka.producerConnProps.metadata.broker.list",
"queueName")
>     .setProp(ConsumerConfig.GROUP_ID_CONFIG, "grp")
>     .setProp(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer")
>     .setOffsetCommitPeriodMs(100)
>     .setRetry(new KafkaSpoutRetryExponentialBackoff(
>       KafkaSpoutRetryExponentialBackoff.TimeInterval.milliSeconds(100),
>       KafkaSpoutRetryExponentialBackoff.TimeInterval.milliSeconds(100),
>       10,
>       KafkaSpoutRetryExponentialBackoff.TimeInterval.milliSeconds(3000)
>     ))
>     .setFirstPollOffsetStrategy(offsetStrategyMapping(ConnektConfig.getOrElse("connections.kafka.consumerConnProps.offset.strategy",
"UNCOMMITTED_EARLIEST")))
>     .setMaxUncommittedOffsets(ConnektConfig.getOrElse("connections.kafka.consumerConnProps.max.uncommited.offset",
10000))
>     .build()
>
> Other config:
>
> messageTimeoutInSecons: 300
>
> [image: Screenshot 2018-12-05 at 12.03.08 PM.png]
>
>
>
>
> Best Regards
>
> Saurabh Kumar Mimani
>
>
>
>
> On Mon, Dec 3, 2018 at 9:18 PM Stig Rohde Døssing <stigdoessing@gmail.com>
> wrote:
>
>> Hi Saurabh,
>>
>> The tuple emitted by the spout will only be acked once all branches of
>> the tuple tree have been acked, i.e. all 100 tuples are acked.
>>
>> The error you're seeing was added as part of
>> https://issues.apache.org/jira/browse/STORM-2666 to try to avoid having
>> that bug pop up again. Could you try posting your spout configuration? Also
>> if possible, it would be helpful if you could enable debug logging for org.apache.storm.kafka.spout.KafkaSpout
>> and maybe also org.apache.storm.kafka.spout.internal.OffsetManager. They
>> log when offsets are committed (e.g.
>> https://github.com/apache/storm/blob/v1.2.1/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java#L546),
>> and also when the consumer position is changed (e.g.
>> https://github.com/apache/storm/blob/v1.2.1/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java#L561
>> ). It should be possible to track down when/why the consumer position wound
>> up behind the committed offset.
>>
>> Just so you're aware, the check that crashes the spout has been removed
>> as of https://issues.apache.org/jira/browse/STORM-3102. I'd still like
>> to know if there's a bug in the spout causing it to emit tuples that were
>> already committed though.
>>
>> Den man. 3. dec. 2018 kl. 11.29 skrev saurabh mimani <
>> mimani.saurabh@gmail.com>:
>>
>>>
>>> Version Info:
>>>    "org.apache.storm" % "storm-core" % "1.2.1"
>>>    "org.apache.storm" % "storm-kafka-client" % "1.2.1"
>>>
>>> I have a storm topology which looks like following:
>>>
>>> boltA -> boltB -> boltC -> boltD
>>>
>>> boltA just does some formatting of requests and emits another tuple.
>>> boltB does some processing and emits around 100 tuples for each tuple
>>> being received. boltC and boltD processes these tuples. All the bolts
>>> implements BaseBasicBolt.
>>>
>>> What I am noticing is whenever boltD marks some tuple as fail and marks
>>> that for retry by throwing FailedException, After a few minutes less
>>> than my topology timeout, I get the following error:
>>>
>>> 2018-11-30T20:01:05.261+05:30 util [ERROR] Async loop died!
>>> java.lang.IllegalStateException: Attempting to emit a message that has already
been committed. This should never occur when using the at-least-once processing guarantee.
>>>         at org.apache.storm.kafka.spout.KafkaSpout.emitOrRetryTuple(KafkaSpout.java:471)
~[stormjar.jar:?]
>>>         at org.apache.storm.kafka.spout.KafkaSpout.emitIfWaitingNotEmitted(KafkaSpout.java:440)
~[stormjar.jar:?]
>>>         at org.apache.storm.kafka.spout.KafkaSpout.nextTuple(KafkaSpout.java:308)
~[stormjar.jar:?]
>>>         at org.apache.storm.daemon.executor$fn__4975$fn__4990$fn__5021.invoke(executor.clj:654)
~[storm-core-1.2.1.jar:1.2.1]
>>>         at org.apache.storm.util$async_loop$fn__557.invoke(util.clj:484) [storm-core-1.2.1.jar:1.2.1]
>>>         at clojure.lang.AFn.run(AFn.java:22) [clojure-1.7.0.jar:?]
>>>         at java.lang.Thread.run(Thread.java:745) [?:1.8.0_60]
>>> 2018-11-30T20:01:05.262+05:30 executor [ERROR]
>>> java.lang.IllegalStateException: Attempting to emit a message that has already
been committed. This should never occur when using the at-least-once processing guarantee.
>>>         at org.apache.storm.kafka.spout.KafkaSpout.emitOrRetryTuple(KafkaSpout.java:471)
~[stormjar.jar:?]
>>>         at org.apache.storm.kafka.spout.KafkaSpout.emitIfWaitingNotEmitted(KafkaSpout.java:440)
~[stormjar.jar:?]
>>>         at org.apache.storm.kafka.spout.KafkaSpout.nextTuple(KafkaSpout.java:308)
~[stormjar.jar:?]
>>>         at org.apache.storm.daemon.executor$fn__4975$fn__4990$fn__5021.invoke(executor.clj:654)
~[storm-core-1.2.1.jar:1.2.1]
>>>         at org.apache.storm.util$async_loop$fn__557.invoke(util.clj:484) [storm-core-1.2.1.jar:1.2.1]
>>>         at clojure.lang.AFn.run(AFn.java:22) [clojure-1.7.0.jar:?]
>>>         at java.lang.Thread.run(Thread.java:745) [?:1.8.0_60]
>>>
>>> What seems to be happening is this happens when boltB emits 100 out of
>>> 1 tuple and boltDfails one of the tuples out of those 100 tuples, I am
>>> getting this error. Not able to understand how to fix this, ideally it
>>> should ack an original tuple when all 100 tuples are acked, but
>>> probably an original tuple is acked before all those 100 tuples are acked,
>>> which causes this error.
>>>
>>>
>>>
>>> Best Regards
>>>
>>> Saurabh Kumar Mimani
>>>
>>>
>>>

Mime
View raw message