storm-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From saurabh mimani <mimani.saur...@gmail.com>
Subject Re: Storm using Kafka Spout gives error: IllegalStateException
Date Sat, 08 Dec 2018 06:25:00 GMT
Hey,

Thanks for looking into this. I was not able to produce this earlier on my
local, however I will again try once. I was consistently able to reproduce
it with parallelism of 5 for boltA and parallelism of 200 with boltB on 2
machines in cluster mode.

I will try again with your code once.

These <https://gist.github.com/mimani/56dd31db34e4356b25c796d78261f7b8> are
logs of Kafka Spout, when I was able to reproduce it in cluster mode with
my topology, in case these helps.



Best Regards

Saurabh Kumar Mimani




On Wed, Dec 5, 2018 at 11:33 PM Stig Rohde Døssing <stigdoessing@gmail.com>
wrote:

> I can't reproduce this.
>
> I created a test topology similar to the code you posted, based on the
> 1.2.1 release tag
> https://github.com/srdo/storm/commit/f5577f7a773f3d433b90a2670de5329b396f5564
> .
>
> I set up a local Kafka instance and put enough messages to run the
> topology for 15 minutes or so in the test topic. After populating the
> topic, I started the topology and let it run until it reached the end of
> the topic. As expected a lot of messages failed, but after a while it
> managed to successfully process all messages. I didn't see any worker
> crashes, and the logs only show some errors related to moving files
> (expected on Windows).
>
> The topology seems to work fine against both Kafka 0.10.2.2 and 1.1.1,
> though 1.1.1 is slower due to
> https://issues.apache.org/jira/browse/STORM-3102.
>
> The Kafka setup was with the default configuration for both Kafka and
> Zookeeper, and Storm was set up with a local Nimbus, single local
> Supervisor and 4 worker slots.
>
> Saurabh please try to reproduce the issue using the topology I linked. If
> you need to make adjustments in order to provoke the issue, please update
> the code and link it so I can check it out and try it.
>
> Den ons. 5. dec. 2018 kl. 16.42 skrev saurabh mimani <
> mimani.saurabh@gmail.com>:
>
>> No, I have checked that, there is no other consumer group consuming from
>> the same.
>>
>> Thanks for looking into it, let me know if you need any other information.
>>
>>
>>
>> Best Regards
>>
>> Saurabh Kumar Mimani
>>
>>
>>
>>
>> On Wed, Dec 5, 2018 at 9:02 PM Stig Rohde Døssing <stigdoessing@gmail.com>
>> wrote:
>>
>>> Ravi, BaseBasicBolt does automatically anchor any emitted tuples to the
>>> input tuple. It's intended for bolts that just need to receive a tuple,
>>> synchronously process it and emit some new tuples anchored to the input
>>> tuple. It's there because doing manual acking is tedious and error-prone in
>>> cases where you don't need to be able to e.g. emit new unachored tuples or
>>> ack the input tuple asynchronously. As Peter mentioned, the
>>> BasicBoltExecutor (
>>> https://github.com/apache/storm/blob/21bb1388414d373572779289edc785c7e5aa52aa/storm-client/src/jvm/org/apache/storm/topology/BasicBoltExecutor.java#L42)
>>> handles acking for you.
>>>
>>> Saurabh, I'll see if I can reproduce your issue. Please also check that
>>> you don't have any other consumers using the same consumer group as the
>>> spout.
>>>
>>> Den ons. 5. dec. 2018 kl. 11.53 skrev Peter Chamberlain <
>>> peter.chamberlain@htk.co.uk>:
>>>
>>>> Pretty sure that the ack path is handled by BasicBoltExecutor (an
>>>> implmentation of IRichBolt), which calls collector.setContext(input), and
>>>> also does the acking inside it's execute function, and in between calls the
>>>> BaseBasicBolt.execute version (which takes the collector as well as the
>>>> tuple as parameters).
>>>> So the intention is clearly that it is automatically anchored and
>>>> acknowledged.
>>>>
>>>> On Wed, 5 Dec 2018 at 09:57, Ravi Sharma <ping2ravi@gmail.com> wrote:
>>>>
>>>>> Hi Saurabh,
>>>>> I checked the BaseBasicBolt which comes with storm, it doesn't do much.
>>>>> Also checked BasicOutputCollector and don't see how it will anchor
>>>>> automatically unless you call     BasicOutputCollector.setContext(Tuple
>>>>> tuple), don't see all of your code, but don't see this call in your boltA
>>>>> code.
>>>>> Also it looks like even when you make this setContext call, after that
>>>>> you will have to emit using following emit function
>>>>>
>>>>> BasicOutputCollector.emit(String streamId, List<Object> tuples)
>>>>>
>>>>> Basically just check that whatever emit function is called, it does
>>>>> pass the input tuple.
>>>>>
>>>>>
>>>>> Once I had exactly same issue for few days, but mine was related to
>>>>> config. I wanted to read from two Kafka topics in one topology and had
two
>>>>> different kafkaspout created, mistakenly I copy pasted same config and
that
>>>>> caused this issue. Not sure if that applies to your scenario.
>>>>>
>>>>>
>>>>> *NOTE*: I checked the latest storm master branch for code.
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> On Wed, 5 Dec 2018, 08:11 saurabh mimani <mimani.saurabh@gmail.com
>>>>> wrote:
>>>>>
>>>>>> Hey Ravi,
>>>>>>
>>>>>> I am using *BaseBasicBolt*, which as described here
>>>>>> <http://storm.apache.org/releases/1.0.6/Guaranteeing-message-processing.html>
>>>>>> : Tuples emitted to BasicOutputCollector are automatically anchored
>>>>>> to the input tuple, and the input tuple is acked for you automatically
when
>>>>>> the execute method completes.
>>>>>>
>>>>>> What you are saying is applicable for *BaseRichBolt. *The Kafka
>>>>>> spout I am using is from storm-kafka-client
>>>>>> <https://mvnrepository.com/artifact/org.apache.storm/storm-kafka-client/1.2.1>
>>>>>> library, so unique ID, etc should be already taken care of.
>>>>>>
>>>>>>
>>>>>>
>>>>>> Best Regards
>>>>>>
>>>>>> Saurabh Kumar Mimani
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>> On Wed, Dec 5, 2018 at 12:52 PM Ravi Sharma <ping2ravi@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Hi Saurabh,
>>>>>>> I think there is issue with part of code which is emitting the
>>>>>>> tuples.
>>>>>>>
>>>>>>> If you want to use ack mechanism, you need to use anchor tuple
when
>>>>>>> you emit from bolts.
>>>>>>>
>>>>>>> Collector.emit(Tuple input, Values data)
>>>>>>>
>>>>>>> Also make sure Kafka spout emits tuple with a unique id.
>>>>>>>
>>>>>>> Thanks
>>>>>>> Ravi
>>>>>>>
>>>>>>>
>>>>>>> On Wed, 5 Dec 2018, 06:35 saurabh mimani <mimani.saurabh@gmail.com
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Hey,
>>>>>>>>
>>>>>>>> Thanks for your reply. What you are saying is correct, However
I am
>>>>>>>> able to reproduce it more often and I think it happens when
multiple tuples
>>>>>>>> gets failed in first run but all of those gets success on
retry, something
>>>>>>>> of that sort is happening.
>>>>>>>>
>>>>>>>> This can be reproduced using following two bolts and kafkaSpout
>>>>>>>> easily, by running in cluster more with 3/4 minutes:
>>>>>>>>
>>>>>>>> *BoltA*
>>>>>>>>
>>>>>>>> case class Abc(index: Int, rand: Boolean)
>>>>>>>>
>>>>>>>> class BoltA  extends BaseBasicBolt {
>>>>>>>>
>>>>>>>>   override def execute(input: Tuple, collector: BasicOutputCollector):
Unit = {
>>>>>>>>     val inp = input.getBinaryByField("value").getObj[someObj]
>>>>>>>>     val randomGenerator = new Random()
>>>>>>>>
>>>>>>>>     var i = 0
>>>>>>>>     val rand = randomGenerator.nextBoolean()
>>>>>>>>     1 to 100 foreach {
>>>>>>>>       collector.emit(new Values(Abc(i, rand).getJsonBytes))
>>>>>>>>       i += 1
>>>>>>>>     }
>>>>>>>>   }
>>>>>>>>
>>>>>>>>   override def declareOutputFields(declarer: OutputFieldsDeclarer):
Unit = {
>>>>>>>>     declarer.declare(new Fields("boltAout"))
>>>>>>>>   }
>>>>>>>>
>>>>>>>> }
>>>>>>>>
>>>>>>>> *BoltB*
>>>>>>>>
>>>>>>>> class BoltB  extends BaseBasicBolt {
>>>>>>>>
>>>>>>>>   override def execute(input: Tuple, collector: BasicOutputCollector):
Unit = {
>>>>>>>>     val abc = input.getBinaryByField("boltAout").getObj[Abc]
>>>>>>>>     println(s"Received ${abc.index}th tuple in BoltB")
>>>>>>>>     if(abc.index >= 97 && abc.rand){
>>>>>>>>       println(s"throwing FailedException for ${abc.index}th
tuple for")
>>>>>>>>       throw new FailedException()
>>>>>>>>     }
>>>>>>>>   }
>>>>>>>>
>>>>>>>>   override def declareOutputFields(declarer: OutputFieldsDeclarer):
Unit = {
>>>>>>>>   }
>>>>>>>> }
>>>>>>>>
>>>>>>>> *KafkaSpout:*
>>>>>>>>
>>>>>>>> private def getKafkaSpoutConfig(source: Config) = KafkaSpoutConfig.builder("connections.kafka.producerConnProps.metadata.broker.list",
"queueName")
>>>>>>>>     .setProp(ConsumerConfig.GROUP_ID_CONFIG, "grp")
>>>>>>>>     .setProp(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.ByteArrayDeserializer")
>>>>>>>>     .setOffsetCommitPeriodMs(100)
>>>>>>>>     .setRetry(new KafkaSpoutRetryExponentialBackoff(
>>>>>>>>       KafkaSpoutRetryExponentialBackoff.TimeInterval.milliSeconds(100),
>>>>>>>>       KafkaSpoutRetryExponentialBackoff.TimeInterval.milliSeconds(100),
>>>>>>>>       10,
>>>>>>>>       KafkaSpoutRetryExponentialBackoff.TimeInterval.milliSeconds(3000)
>>>>>>>>     ))
>>>>>>>>     .setFirstPollOffsetStrategy(offsetStrategyMapping(ConnektConfig.getOrElse("connections.kafka.consumerConnProps.offset.strategy",
"UNCOMMITTED_EARLIEST")))
>>>>>>>>     .setMaxUncommittedOffsets(ConnektConfig.getOrElse("connections.kafka.consumerConnProps.max.uncommited.offset",
10000))
>>>>>>>>     .build()
>>>>>>>>
>>>>>>>> Other config:
>>>>>>>>
>>>>>>>> messageTimeoutInSecons: 300
>>>>>>>>
>>>>>>>> [image: Screenshot 2018-12-05 at 12.03.08 PM.png]
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> Best Regards
>>>>>>>>
>>>>>>>> Saurabh Kumar Mimani
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> On Mon, Dec 3, 2018 at 9:18 PM Stig Rohde Døssing <
>>>>>>>> stigdoessing@gmail.com> wrote:
>>>>>>>>
>>>>>>>>> Hi Saurabh,
>>>>>>>>>
>>>>>>>>> The tuple emitted by the spout will only be acked once
all
>>>>>>>>> branches of the tuple tree have been acked, i.e. all
100 tuples are acked.
>>>>>>>>>
>>>>>>>>> The error you're seeing was added as part of
>>>>>>>>> https://issues.apache.org/jira/browse/STORM-2666 to try
to avoid
>>>>>>>>> having that bug pop up again. Could you try posting your
spout
>>>>>>>>> configuration? Also if possible, it would be helpful
if you could enable
>>>>>>>>> debug logging for org.apache.storm.kafka.spout.KafkaSpout
and
>>>>>>>>> maybe also org.apache.storm.kafka.spout.internal.OffsetManager.
>>>>>>>>> They log when offsets are committed (e.g.
>>>>>>>>> https://github.com/apache/storm/blob/v1.2.1/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java#L546),
>>>>>>>>> and also when the consumer position is changed (e.g.
>>>>>>>>> https://github.com/apache/storm/blob/v1.2.1/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java#L561
>>>>>>>>> ). It should be possible to track down when/why the consumer
position wound
>>>>>>>>> up behind the committed offset.
>>>>>>>>>
>>>>>>>>> Just so you're aware, the check that crashes the spout
has been
>>>>>>>>> removed as of https://issues.apache.org/jira/browse/STORM-3102.
>>>>>>>>> I'd still like to know if there's a bug in the spout
causing it to emit
>>>>>>>>> tuples that were already committed though.
>>>>>>>>>
>>>>>>>>> Den man. 3. dec. 2018 kl. 11.29 skrev saurabh mimani
<
>>>>>>>>> mimani.saurabh@gmail.com>:
>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> Version Info:
>>>>>>>>>>    "org.apache.storm" % "storm-core" % "1.2.1"
>>>>>>>>>>    "org.apache.storm" % "storm-kafka-client" % "1.2.1"
>>>>>>>>>>
>>>>>>>>>> I have a storm topology which looks like following:
>>>>>>>>>>
>>>>>>>>>> boltA -> boltB -> boltC -> boltD
>>>>>>>>>>
>>>>>>>>>> boltA just does some formatting of requests and emits
another
>>>>>>>>>> tuple. boltB does some processing and emits around
100 tuples
>>>>>>>>>> for each tuple being received. boltC and boltD processes
these
>>>>>>>>>> tuples. All the bolts implements BaseBasicBolt.
>>>>>>>>>>
>>>>>>>>>> What I am noticing is whenever boltD marks some tuple
as fail
>>>>>>>>>> and marks that for retry by throwing FailedException,
After a
>>>>>>>>>> few minutes less than my topology timeout, I get
the following error:
>>>>>>>>>>
>>>>>>>>>> 2018-11-30T20:01:05.261+05:30 util [ERROR] Async
loop died!
>>>>>>>>>> java.lang.IllegalStateException: Attempting to emit
a message that has already been committed. This should never occur when using the at-least-once
processing guarantee.
>>>>>>>>>>         at org.apache.storm.kafka.spout.KafkaSpout.emitOrRetryTuple(KafkaSpout.java:471)
~[stormjar.jar:?]
>>>>>>>>>>         at org.apache.storm.kafka.spout.KafkaSpout.emitIfWaitingNotEmitted(KafkaSpout.java:440)
~[stormjar.jar:?]
>>>>>>>>>>         at org.apache.storm.kafka.spout.KafkaSpout.nextTuple(KafkaSpout.java:308)
~[stormjar.jar:?]
>>>>>>>>>>         at org.apache.storm.daemon.executor$fn__4975$fn__4990$fn__5021.invoke(executor.clj:654)
~[storm-core-1.2.1.jar:1.2.1]
>>>>>>>>>>         at org.apache.storm.util$async_loop$fn__557.invoke(util.clj:484)
[storm-core-1.2.1.jar:1.2.1]
>>>>>>>>>>         at clojure.lang.AFn.run(AFn.java:22) [clojure-1.7.0.jar:?]
>>>>>>>>>>         at java.lang.Thread.run(Thread.java:745)
[?:1.8.0_60]
>>>>>>>>>> 2018-11-30T20:01:05.262+05:30 executor [ERROR]
>>>>>>>>>> java.lang.IllegalStateException: Attempting to emit
a message that has already been committed. This should never occur when using the at-least-once
processing guarantee.
>>>>>>>>>>         at org.apache.storm.kafka.spout.KafkaSpout.emitOrRetryTuple(KafkaSpout.java:471)
~[stormjar.jar:?]
>>>>>>>>>>         at org.apache.storm.kafka.spout.KafkaSpout.emitIfWaitingNotEmitted(KafkaSpout.java:440)
~[stormjar.jar:?]
>>>>>>>>>>         at org.apache.storm.kafka.spout.KafkaSpout.nextTuple(KafkaSpout.java:308)
~[stormjar.jar:?]
>>>>>>>>>>         at org.apache.storm.daemon.executor$fn__4975$fn__4990$fn__5021.invoke(executor.clj:654)
~[storm-core-1.2.1.jar:1.2.1]
>>>>>>>>>>         at org.apache.storm.util$async_loop$fn__557.invoke(util.clj:484)
[storm-core-1.2.1.jar:1.2.1]
>>>>>>>>>>         at clojure.lang.AFn.run(AFn.java:22) [clojure-1.7.0.jar:?]
>>>>>>>>>>         at java.lang.Thread.run(Thread.java:745)
[?:1.8.0_60]
>>>>>>>>>>
>>>>>>>>>> What seems to be happening is this happens when boltB
emits 100
>>>>>>>>>> out of 1 tuple and boltDfails one of the tuples out
of those 100
>>>>>>>>>> tuples, I am getting this error. Not able to understand
how to fix this,
>>>>>>>>>> ideally it should ack an original tuple when all
100 tuples are
>>>>>>>>>> acked, but probably an original tuple is acked before
all those 100 tuples
>>>>>>>>>> are acked, which causes this error.
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> Best Regards
>>>>>>>>>>
>>>>>>>>>> Saurabh Kumar Mimani
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>
>>>> --
>>>>
>>>>
>>>>
>>>> *Peter Chamberlain* | Senior Software Engineer | HTK
>>>>
>>>> T: +44(0)870 600 2311
>>>> Connect with me: Email <peter.chamberlain@htk.co.uk>
>>>>
>>>>
>>>> [image: htk logo] <http://www.htk.co.uk/>
>>>>
>>>> Connect with HTK: htk.co.uk <http://www.htk.co.uk/> | LinkedIn
>>>> <http://www.linkedin.com/company/htk/> | Twitter
>>>> <http://www.twitter.com/htkhorizon>
>>>>
>>>> HTK Limited, Chapmans Warehouse, Wherry Quay, Ipswich, IP4 1AS, UK.
>>>> Company Registered in England and Wales as 3191677, VAT Number 675 9467
>>>> 71
>>>>
>>>>
>>>> PLEASE CONSIDER THE ENVIRONMENT BEFORE PRINTING THIS EMAIL.
>>>> This email is only for the use of the intended recipients and may
>>>> contain privileged information. If you’ve received this email in error,
>>>> please let the sender know; then delete the message. The views
>>>> expressed in this email represent those of the sender and not necessarily
>>>> of HTK.
>>>>
>>>

Mime
View raw message