storm-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From saurabh mimani <mimani.saur...@gmail.com>
Subject Re: Storm using Kafka Spout gives error: IllegalStateException
Date Wed, 05 Dec 2018 15:41:10 GMT
No, I have checked that, there is no other consumer group consuming from
the same.

Thanks for looking into it, let me know if you need any other information.



Best Regards

Saurabh Kumar Mimani




On Wed, Dec 5, 2018 at 9:02 PM Stig Rohde Døssing <stigdoessing@gmail.com>
wrote:

> Ravi, BaseBasicBolt does automatically anchor any emitted tuples to the
> input tuple. It's intended for bolts that just need to receive a tuple,
> synchronously process it and emit some new tuples anchored to the input
> tuple. It's there because doing manual acking is tedious and error-prone in
> cases where you don't need to be able to e.g. emit new unachored tuples or
> ack the input tuple asynchronously. As Peter mentioned, the
> BasicBoltExecutor (
> https://github.com/apache/storm/blob/21bb1388414d373572779289edc785c7e5aa52aa/storm-client/src/jvm/org/apache/storm/topology/BasicBoltExecutor.java#L42)
> handles acking for you.
>
> Saurabh, I'll see if I can reproduce your issue. Please also check that
> you don't have any other consumers using the same consumer group as the
> spout.
>
> Den ons. 5. dec. 2018 kl. 11.53 skrev Peter Chamberlain <
> peter.chamberlain@htk.co.uk>:
>
>> Pretty sure that the ack path is handled by BasicBoltExecutor (an
>> implmentation of IRichBolt), which calls collector.setContext(input), and
>> also does the acking inside it's execute function, and in between calls the
>> BaseBasicBolt.execute version (which takes the collector as well as the
>> tuple as parameters).
>> So the intention is clearly that it is automatically anchored and
>> acknowledged.
>>
>> On Wed, 5 Dec 2018 at 09:57, Ravi Sharma <ping2ravi@gmail.com> wrote:
>>
>>> Hi Saurabh,
>>> I checked the BaseBasicBolt which comes with storm, it doesn't do much.
>>> Also checked BasicOutputCollector and don't see how it will anchor
>>> automatically unless you call     BasicOutputCollector.setContext(Tuple
>>> tuple), don't see all of your code, but don't see this call in your boltA
>>> code.
>>> Also it looks like even when you make this setContext call, after that
>>> you will have to emit using following emit function
>>>
>>> BasicOutputCollector.emit(String streamId, List<Object> tuples)
>>>
>>> Basically just check that whatever emit function is called, it does pass
>>> the input tuple.
>>>
>>>
>>> Once I had exactly same issue for few days, but mine was related to
>>> config. I wanted to read from two Kafka topics in one topology and had two
>>> different kafkaspout created, mistakenly I copy pasted same config and that
>>> caused this issue. Not sure if that applies to your scenario.
>>>
>>>
>>> *NOTE*: I checked the latest storm master branch for code.
>>>
>>>
>>>
>>>
>>>
>>>
>>> On Wed, 5 Dec 2018, 08:11 saurabh mimani <mimani.saurabh@gmail.com
>>> wrote:
>>>
>>>> Hey Ravi,
>>>>
>>>> I am using *BaseBasicBolt*, which as described here
>>>> <http://storm.apache.org/releases/1.0.6/Guaranteeing-message-processing.html>
>>>> : Tuples emitted to BasicOutputCollector are automatically anchored to
>>>> the input tuple, and the input tuple is acked for you automatically when
>>>> the execute method completes.
>>>>
>>>> What you are saying is applicable for *BaseRichBolt. *The Kafka spout
>>>> I am using is from storm-kafka-client
>>>> <https://mvnrepository.com/artifact/org.apache.storm/storm-kafka-client/1.2.1>
>>>> library, so unique ID, etc should be already taken care of.
>>>>
>>>>
>>>>
>>>> Best Regards
>>>>
>>>> Saurabh Kumar Mimani
>>>>
>>>>
>>>>
>>>>
>>>> On Wed, Dec 5, 2018 at 12:52 PM Ravi Sharma <ping2ravi@gmail.com>
>>>> wrote:
>>>>
>>>>> Hi Saurabh,
>>>>> I think there is issue with part of code which is emitting the tuples.
>>>>>
>>>>> If you want to use ack mechanism, you need to use anchor tuple when
>>>>> you emit from bolts.
>>>>>
>>>>> Collector.emit(Tuple input, Values data)
>>>>>
>>>>> Also make sure Kafka spout emits tuple with a unique id.
>>>>>
>>>>> Thanks
>>>>> Ravi
>>>>>
>>>>>
>>>>> On Wed, 5 Dec 2018, 06:35 saurabh mimani <mimani.saurabh@gmail.com
>>>>> wrote:
>>>>>
>>>>>> Hey,
>>>>>>
>>>>>> Thanks for your reply. What you are saying is correct, However I
am
>>>>>> able to reproduce it more often and I think it happens when multiple
tuples
>>>>>> gets failed in first run but all of those gets success on retry,
something
>>>>>> of that sort is happening.
>>>>>>
>>>>>> This can be reproduced using following two bolts and kafkaSpout
>>>>>> easily, by running in cluster more with 3/4 minutes:
>>>>>>
>>>>>> *BoltA*
>>>>>>
>>>>>> case class Abc(index: Int, rand: Boolean)
>>>>>>
>>>>>> class BoltA  extends BaseBasicBolt {
>>>>>>
>>>>>>   override def execute(input: Tuple, collector: BasicOutputCollector):
Unit = {
>>>>>>     val inp = input.getBinaryByField("value").getObj[someObj]
>>>>>>     val randomGenerator = new Random()
>>>>>>
>>>>>>     var i = 0
>>>>>>     val rand = randomGenerator.nextBoolean()
>>>>>>     1 to 100 foreach {
>>>>>>       collector.emit(new Values(Abc(i, rand).getJsonBytes))
>>>>>>       i += 1
>>>>>>     }
>>>>>>   }
>>>>>>
>>>>>>   override def declareOutputFields(declarer: OutputFieldsDeclarer):
Unit = {
>>>>>>     declarer.declare(new Fields("boltAout"))
>>>>>>   }
>>>>>>
>>>>>> }
>>>>>>
>>>>>> *BoltB*
>>>>>>
>>>>>> class BoltB  extends BaseBasicBolt {
>>>>>>
>>>>>>   override def execute(input: Tuple, collector: BasicOutputCollector):
Unit = {
>>>>>>     val abc = input.getBinaryByField("boltAout").getObj[Abc]
>>>>>>     println(s"Received ${abc.index}th tuple in BoltB")
>>>>>>     if(abc.index >= 97 && abc.rand){
>>>>>>       println(s"throwing FailedException for ${abc.index}th tuple
for")
>>>>>>       throw new FailedException()
>>>>>>     }
>>>>>>   }
>>>>>>
>>>>>>   override def declareOutputFields(declarer: OutputFieldsDeclarer):
Unit = {
>>>>>>   }
>>>>>> }
>>>>>>
>>>>>> *KafkaSpout:*
>>>>>>
>>>>>> private def getKafkaSpoutConfig(source: Config) = KafkaSpoutConfig.builder("connections.kafka.producerConnProps.metadata.broker.list",
"queueName")
>>>>>>     .setProp(ConsumerConfig.GROUP_ID_CONFIG, "grp")
>>>>>>     .setProp(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer")
>>>>>>     .setOffsetCommitPeriodMs(100)
>>>>>>     .setRetry(new KafkaSpoutRetryExponentialBackoff(
>>>>>>       KafkaSpoutRetryExponentialBackoff.TimeInterval.milliSeconds(100),
>>>>>>       KafkaSpoutRetryExponentialBackoff.TimeInterval.milliSeconds(100),
>>>>>>       10,
>>>>>>       KafkaSpoutRetryExponentialBackoff.TimeInterval.milliSeconds(3000)
>>>>>>     ))
>>>>>>     .setFirstPollOffsetStrategy(offsetStrategyMapping(ConnektConfig.getOrElse("connections.kafka.consumerConnProps.offset.strategy",
"UNCOMMITTED_EARLIEST")))
>>>>>>     .setMaxUncommittedOffsets(ConnektConfig.getOrElse("connections.kafka.consumerConnProps.max.uncommited.offset",
10000))
>>>>>>     .build()
>>>>>>
>>>>>> Other config:
>>>>>>
>>>>>> messageTimeoutInSecons: 300
>>>>>>
>>>>>> [image: Screenshot 2018-12-05 at 12.03.08 PM.png]
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>> Best Regards
>>>>>>
>>>>>> Saurabh Kumar Mimani
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>> On Mon, Dec 3, 2018 at 9:18 PM Stig Rohde Døssing <
>>>>>> stigdoessing@gmail.com> wrote:
>>>>>>
>>>>>>> Hi Saurabh,
>>>>>>>
>>>>>>> The tuple emitted by the spout will only be acked once all branches
>>>>>>> of the tuple tree have been acked, i.e. all 100 tuples are acked.
>>>>>>>
>>>>>>> The error you're seeing was added as part of
>>>>>>> https://issues.apache.org/jira/browse/STORM-2666 to try to avoid
>>>>>>> having that bug pop up again. Could you try posting your spout
>>>>>>> configuration? Also if possible, it would be helpful if you could
enable
>>>>>>> debug logging for org.apache.storm.kafka.spout.KafkaSpout and
maybe
>>>>>>> also org.apache.storm.kafka.spout.internal.OffsetManager. They
log
>>>>>>> when offsets are committed (e.g.
>>>>>>> https://github.com/apache/storm/blob/v1.2.1/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java#L546),
>>>>>>> and also when the consumer position is changed (e.g.
>>>>>>> https://github.com/apache/storm/blob/v1.2.1/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java#L561
>>>>>>> ). It should be possible to track down when/why the consumer
position wound
>>>>>>> up behind the committed offset.
>>>>>>>
>>>>>>> Just so you're aware, the check that crashes the spout has been
>>>>>>> removed as of https://issues.apache.org/jira/browse/STORM-3102.
I'd
>>>>>>> still like to know if there's a bug in the spout causing it to
emit tuples
>>>>>>> that were already committed though.
>>>>>>>
>>>>>>> Den man. 3. dec. 2018 kl. 11.29 skrev saurabh mimani <
>>>>>>> mimani.saurabh@gmail.com>:
>>>>>>>
>>>>>>>>
>>>>>>>> Version Info:
>>>>>>>>    "org.apache.storm" % "storm-core" % "1.2.1"
>>>>>>>>    "org.apache.storm" % "storm-kafka-client" % "1.2.1"
>>>>>>>>
>>>>>>>> I have a storm topology which looks like following:
>>>>>>>>
>>>>>>>> boltA -> boltB -> boltC -> boltD
>>>>>>>>
>>>>>>>> boltA just does some formatting of requests and emits another
>>>>>>>> tuple. boltB does some processing and emits around 100 tuples
for
>>>>>>>> each tuple being received. boltC and boltD processes these
tuples.
>>>>>>>> All the bolts implements BaseBasicBolt.
>>>>>>>>
>>>>>>>> What I am noticing is whenever boltD marks some tuple as
fail and
>>>>>>>> marks that for retry by throwing FailedException, After a
few
>>>>>>>> minutes less than my topology timeout, I get the following
error:
>>>>>>>>
>>>>>>>> 2018-11-30T20:01:05.261+05:30 util [ERROR] Async loop died!
>>>>>>>> java.lang.IllegalStateException: Attempting to emit a message
that has already been committed. This should never occur when using the at-least-once processing
guarantee.
>>>>>>>>         at org.apache.storm.kafka.spout.KafkaSpout.emitOrRetryTuple(KafkaSpout.java:471)
~[stormjar.jar:?]
>>>>>>>>         at org.apache.storm.kafka.spout.KafkaSpout.emitIfWaitingNotEmitted(KafkaSpout.java:440)
~[stormjar.jar:?]
>>>>>>>>         at org.apache.storm.kafka.spout.KafkaSpout.nextTuple(KafkaSpout.java:308)
~[stormjar.jar:?]
>>>>>>>>         at org.apache.storm.daemon.executor$fn__4975$fn__4990$fn__5021.invoke(executor.clj:654)
~[storm-core-1.2.1.jar:1.2.1]
>>>>>>>>         at org.apache.storm.util$async_loop$fn__557.invoke(util.clj:484)
[storm-core-1.2.1.jar:1.2.1]
>>>>>>>>         at clojure.lang.AFn.run(AFn.java:22) [clojure-1.7.0.jar:?]
>>>>>>>>         at java.lang.Thread.run(Thread.java:745) [?:1.8.0_60]
>>>>>>>> 2018-11-30T20:01:05.262+05:30 executor [ERROR]
>>>>>>>> java.lang.IllegalStateException: Attempting to emit a message
that has already been committed. This should never occur when using the at-least-once processing
guarantee.
>>>>>>>>         at org.apache.storm.kafka.spout.KafkaSpout.emitOrRetryTuple(KafkaSpout.java:471)
~[stormjar.jar:?]
>>>>>>>>         at org.apache.storm.kafka.spout.KafkaSpout.emitIfWaitingNotEmitted(KafkaSpout.java:440)
~[stormjar.jar:?]
>>>>>>>>         at org.apache.storm.kafka.spout.KafkaSpout.nextTuple(KafkaSpout.java:308)
~[stormjar.jar:?]
>>>>>>>>         at org.apache.storm.daemon.executor$fn__4975$fn__4990$fn__5021.invoke(executor.clj:654)
~[storm-core-1.2.1.jar:1.2.1]
>>>>>>>>         at org.apache.storm.util$async_loop$fn__557.invoke(util.clj:484)
[storm-core-1.2.1.jar:1.2.1]
>>>>>>>>         at clojure.lang.AFn.run(AFn.java:22) [clojure-1.7.0.jar:?]
>>>>>>>>         at java.lang.Thread.run(Thread.java:745) [?:1.8.0_60]
>>>>>>>>
>>>>>>>> What seems to be happening is this happens when boltB emits
100
>>>>>>>> out of 1 tuple and boltDfails one of the tuples out of those
100
>>>>>>>> tuples, I am getting this error. Not able to understand how
to fix this,
>>>>>>>> ideally it should ack an original tuple when all 100 tuples
are
>>>>>>>> acked, but probably an original tuple is acked before all
those 100 tuples
>>>>>>>> are acked, which causes this error.
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> Best Regards
>>>>>>>>
>>>>>>>> Saurabh Kumar Mimani
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>
>> --
>>
>>
>>
>> *Peter Chamberlain* | Senior Software Engineer | HTK
>>
>> T: +44(0)870 600 2311
>> Connect with me: Email <peter.chamberlain@htk.co.uk>
>>
>>
>> [image: htk logo] <http://www.htk.co.uk/>
>>
>> Connect with HTK: htk.co.uk <http://www.htk.co.uk/> | LinkedIn
>> <http://www.linkedin.com/company/htk/> | Twitter
>> <http://www.twitter.com/htkhorizon>
>>
>> HTK Limited, Chapmans Warehouse, Wherry Quay, Ipswich, IP4 1AS, UK.
>> Company Registered in England and Wales as 3191677, VAT Number 675 9467 71
>>
>>
>> PLEASE CONSIDER THE ENVIRONMENT BEFORE PRINTING THIS EMAIL.
>> This email is only for the use of the intended recipients and may contain
>> privileged information. If you’ve received this email in error, please let
>> the sender know; then delete the message. The views expressed in this
>> email represent those of the sender and not necessarily of HTK.
>>
>

Mime
View raw message