storm-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Stig Rohde Døssing <stigdoess...@gmail.com>
Subject Re: Storm using Kafka Spout gives error: IllegalStateException
Date Sat, 08 Dec 2018 21:49:17 GMT
I believe I have a fix, your logs were helpful. Please try out the changes
in https://github.com/apache/storm/pull/2923/files.

Den lør. 8. dec. 2018 kl. 07.25 skrev saurabh mimani <
mimani.saurabh@gmail.com>:

> Hey,
>
> Thanks for looking into this. I was not able to produce this earlier on my
> local, however I will again try once. I was consistently able to reproduce
> it with parallelism of 5 for boltA and parallelism of 200 with boltB on 2
> machines in cluster mode.
>
> I will try again with your code once.
>
> These <https://gist.github.com/mimani/56dd31db34e4356b25c796d78261f7b8> are
> logs of Kafka Spout, when I was able to reproduce it in cluster mode with
> my topology, in case these helps.
>
>
>
> Best Regards
>
> Saurabh Kumar Mimani
>
>
>
>
> On Wed, Dec 5, 2018 at 11:33 PM Stig Rohde Døssing <stigdoessing@gmail.com>
> wrote:
>
>> I can't reproduce this.
>>
>> I created a test topology similar to the code you posted, based on the
>> 1.2.1 release tag
>> https://github.com/srdo/storm/commit/f5577f7a773f3d433b90a2670de5329b396f5564
>> .
>>
>> I set up a local Kafka instance and put enough messages to run the
>> topology for 15 minutes or so in the test topic. After populating the
>> topic, I started the topology and let it run until it reached the end of
>> the topic. As expected a lot of messages failed, but after a while it
>> managed to successfully process all messages. I didn't see any worker
>> crashes, and the logs only show some errors related to moving files
>> (expected on Windows).
>>
>> The topology seems to work fine against both Kafka 0.10.2.2 and 1.1.1,
>> though 1.1.1 is slower due to
>> https://issues.apache.org/jira/browse/STORM-3102.
>>
>> The Kafka setup was with the default configuration for both Kafka and
>> Zookeeper, and Storm was set up with a local Nimbus, single local
>> Supervisor and 4 worker slots.
>>
>> Saurabh please try to reproduce the issue using the topology I linked. If
>> you need to make adjustments in order to provoke the issue, please update
>> the code and link it so I can check it out and try it.
>>
>> Den ons. 5. dec. 2018 kl. 16.42 skrev saurabh mimani <
>> mimani.saurabh@gmail.com>:
>>
>>> No, I have checked that, there is no other consumer group consuming from
>>> the same.
>>>
>>> Thanks for looking into it, let me know if you need any
>>> other information.
>>>
>>>
>>>
>>> Best Regards
>>>
>>> Saurabh Kumar Mimani
>>>
>>>
>>>
>>>
>>> On Wed, Dec 5, 2018 at 9:02 PM Stig Rohde Døssing <
>>> stigdoessing@gmail.com> wrote:
>>>
>>>> Ravi, BaseBasicBolt does automatically anchor any emitted tuples to the
>>>> input tuple. It's intended for bolts that just need to receive a tuple,
>>>> synchronously process it and emit some new tuples anchored to the input
>>>> tuple. It's there because doing manual acking is tedious and error-prone
in
>>>> cases where you don't need to be able to e.g. emit new unachored tuples or
>>>> ack the input tuple asynchronously. As Peter mentioned, the
>>>> BasicBoltExecutor (
>>>> https://github.com/apache/storm/blob/21bb1388414d373572779289edc785c7e5aa52aa/storm-client/src/jvm/org/apache/storm/topology/BasicBoltExecutor.java#L42)
>>>> handles acking for you.
>>>>
>>>> Saurabh, I'll see if I can reproduce your issue. Please also check that
>>>> you don't have any other consumers using the same consumer group as the
>>>> spout.
>>>>
>>>> Den ons. 5. dec. 2018 kl. 11.53 skrev Peter Chamberlain <
>>>> peter.chamberlain@htk.co.uk>:
>>>>
>>>>> Pretty sure that the ack path is handled by BasicBoltExecutor (an
>>>>> implmentation of IRichBolt), which calls collector.setContext(input),
and
>>>>> also does the acking inside it's execute function, and in between calls
the
>>>>> BaseBasicBolt.execute version (which takes the collector as well as the
>>>>> tuple as parameters).
>>>>> So the intention is clearly that it is automatically anchored and
>>>>> acknowledged.
>>>>>
>>>>> On Wed, 5 Dec 2018 at 09:57, Ravi Sharma <ping2ravi@gmail.com>
wrote:
>>>>>
>>>>>> Hi Saurabh,
>>>>>> I checked the BaseBasicBolt which comes with storm, it doesn't do
>>>>>> much.
>>>>>> Also checked BasicOutputCollector and don't see how it will anchor
>>>>>> automatically unless you call     BasicOutputCollector.setContext(Tuple
>>>>>> tuple), don't see all of your code, but don't see this call in your
boltA
>>>>>> code.
>>>>>> Also it looks like even when you make this setContext call, after
>>>>>> that you will have to emit using following emit function
>>>>>>
>>>>>> BasicOutputCollector.emit(String streamId, List<Object> tuples)
>>>>>>
>>>>>> Basically just check that whatever emit function is called, it does
>>>>>> pass the input tuple.
>>>>>>
>>>>>>
>>>>>> Once I had exactly same issue for few days, but mine was related
to
>>>>>> config. I wanted to read from two Kafka topics in one topology and
had two
>>>>>> different kafkaspout created, mistakenly I copy pasted same config
and that
>>>>>> caused this issue. Not sure if that applies to your scenario.
>>>>>>
>>>>>>
>>>>>> *NOTE*: I checked the latest storm master branch for code.
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>> On Wed, 5 Dec 2018, 08:11 saurabh mimani <mimani.saurabh@gmail.com
>>>>>> wrote:
>>>>>>
>>>>>>> Hey Ravi,
>>>>>>>
>>>>>>> I am using *BaseBasicBolt*, which as described here
>>>>>>> <http://storm.apache.org/releases/1.0.6/Guaranteeing-message-processing.html>
>>>>>>> : Tuples emitted to BasicOutputCollector are automatically anchored
>>>>>>> to the input tuple, and the input tuple is acked for you automatically
when
>>>>>>> the execute method completes.
>>>>>>>
>>>>>>> What you are saying is applicable for *BaseRichBolt. *The Kafka
>>>>>>> spout I am using is from storm-kafka-client
>>>>>>> <https://mvnrepository.com/artifact/org.apache.storm/storm-kafka-client/1.2.1>
>>>>>>> library, so unique ID, etc should be already taken care of.
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> Best Regards
>>>>>>>
>>>>>>> Saurabh Kumar Mimani
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> On Wed, Dec 5, 2018 at 12:52 PM Ravi Sharma <ping2ravi@gmail.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Hi Saurabh,
>>>>>>>> I think there is issue with part of code which is emitting
the
>>>>>>>> tuples.
>>>>>>>>
>>>>>>>> If you want to use ack mechanism, you need to use anchor
tuple when
>>>>>>>> you emit from bolts.
>>>>>>>>
>>>>>>>> Collector.emit(Tuple input, Values data)
>>>>>>>>
>>>>>>>> Also make sure Kafka spout emits tuple with a unique id.
>>>>>>>>
>>>>>>>> Thanks
>>>>>>>> Ravi
>>>>>>>>
>>>>>>>>
>>>>>>>> On Wed, 5 Dec 2018, 06:35 saurabh mimani <mimani.saurabh@gmail.com
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Hey,
>>>>>>>>>
>>>>>>>>> Thanks for your reply. What you are saying is correct,
However I
>>>>>>>>> am able to reproduce it more often and I think it happens
when multiple
>>>>>>>>> tuples gets failed in first run but all of those gets
success on retry,
>>>>>>>>> something of that sort is happening.
>>>>>>>>>
>>>>>>>>> This can be reproduced using following two bolts and
kafkaSpout
>>>>>>>>> easily, by running in cluster more with 3/4 minutes:
>>>>>>>>>
>>>>>>>>> *BoltA*
>>>>>>>>>
>>>>>>>>> case class Abc(index: Int, rand: Boolean)
>>>>>>>>>
>>>>>>>>> class BoltA  extends BaseBasicBolt {
>>>>>>>>>
>>>>>>>>>   override def execute(input: Tuple, collector: BasicOutputCollector):
Unit = {
>>>>>>>>>     val inp = input.getBinaryByField("value").getObj[someObj]
>>>>>>>>>     val randomGenerator = new Random()
>>>>>>>>>
>>>>>>>>>     var i = 0
>>>>>>>>>     val rand = randomGenerator.nextBoolean()
>>>>>>>>>     1 to 100 foreach {
>>>>>>>>>       collector.emit(new Values(Abc(i, rand).getJsonBytes))
>>>>>>>>>       i += 1
>>>>>>>>>     }
>>>>>>>>>   }
>>>>>>>>>
>>>>>>>>>   override def declareOutputFields(declarer: OutputFieldsDeclarer):
Unit = {
>>>>>>>>>     declarer.declare(new Fields("boltAout"))
>>>>>>>>>   }
>>>>>>>>>
>>>>>>>>> }
>>>>>>>>>
>>>>>>>>> *BoltB*
>>>>>>>>>
>>>>>>>>> class BoltB  extends BaseBasicBolt {
>>>>>>>>>
>>>>>>>>>   override def execute(input: Tuple, collector: BasicOutputCollector):
Unit = {
>>>>>>>>>     val abc = input.getBinaryByField("boltAout").getObj[Abc]
>>>>>>>>>     println(s"Received ${abc.index}th tuple in BoltB")
>>>>>>>>>     if(abc.index >= 97 && abc.rand){
>>>>>>>>>       println(s"throwing FailedException for ${abc.index}th
tuple for")
>>>>>>>>>       throw new FailedException()
>>>>>>>>>     }
>>>>>>>>>   }
>>>>>>>>>
>>>>>>>>>   override def declareOutputFields(declarer: OutputFieldsDeclarer):
Unit = {
>>>>>>>>>   }
>>>>>>>>> }
>>>>>>>>>
>>>>>>>>> *KafkaSpout:*
>>>>>>>>>
>>>>>>>>> private def getKafkaSpoutConfig(source: Config) = KafkaSpoutConfig.builder("connections.kafka.producerConnProps.metadata.broker.list",
"queueName")
>>>>>>>>>     .setProp(ConsumerConfig.GROUP_ID_CONFIG, "grp")
>>>>>>>>>     .setProp(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.ByteArrayDeserializer")
>>>>>>>>>     .setOffsetCommitPeriodMs(100)
>>>>>>>>>     .setRetry(new KafkaSpoutRetryExponentialBackoff(
>>>>>>>>>       KafkaSpoutRetryExponentialBackoff.TimeInterval.milliSeconds(100),
>>>>>>>>>       KafkaSpoutRetryExponentialBackoff.TimeInterval.milliSeconds(100),
>>>>>>>>>       10,
>>>>>>>>>       KafkaSpoutRetryExponentialBackoff.TimeInterval.milliSeconds(3000)
>>>>>>>>>     ))
>>>>>>>>>     .setFirstPollOffsetStrategy(offsetStrategyMapping(ConnektConfig.getOrElse("connections.kafka.consumerConnProps.offset.strategy",
"UNCOMMITTED_EARLIEST")))
>>>>>>>>>     .setMaxUncommittedOffsets(ConnektConfig.getOrElse("connections.kafka.consumerConnProps.max.uncommited.offset",
10000))
>>>>>>>>>     .build()
>>>>>>>>>
>>>>>>>>> Other config:
>>>>>>>>>
>>>>>>>>> messageTimeoutInSecons: 300
>>>>>>>>>
>>>>>>>>> [image: Screenshot 2018-12-05 at 12.03.08 PM.png]
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> Best Regards
>>>>>>>>>
>>>>>>>>> Saurabh Kumar Mimani
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Mon, Dec 3, 2018 at 9:18 PM Stig Rohde Døssing <
>>>>>>>>> stigdoessing@gmail.com> wrote:
>>>>>>>>>
>>>>>>>>>> Hi Saurabh,
>>>>>>>>>>
>>>>>>>>>> The tuple emitted by the spout will only be acked
once all
>>>>>>>>>> branches of the tuple tree have been acked, i.e.
all 100 tuples are acked.
>>>>>>>>>>
>>>>>>>>>> The error you're seeing was added as part of
>>>>>>>>>> https://issues.apache.org/jira/browse/STORM-2666
to try to avoid
>>>>>>>>>> having that bug pop up again. Could you try posting
your spout
>>>>>>>>>> configuration? Also if possible, it would be helpful
if you could enable
>>>>>>>>>> debug logging for org.apache.storm.kafka.spout.KafkaSpout
and
>>>>>>>>>> maybe also org.apache.storm.kafka.spout.internal.OffsetManager.
>>>>>>>>>> They log when offsets are committed (e.g.
>>>>>>>>>> https://github.com/apache/storm/blob/v1.2.1/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java#L546),
>>>>>>>>>> and also when the consumer position is changed (e.g.
>>>>>>>>>> https://github.com/apache/storm/blob/v1.2.1/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java#L561
>>>>>>>>>> ). It should be possible to track down when/why the
consumer position wound
>>>>>>>>>> up behind the committed offset.
>>>>>>>>>>
>>>>>>>>>> Just so you're aware, the check that crashes the
spout has been
>>>>>>>>>> removed as of https://issues.apache.org/jira/browse/STORM-3102.
>>>>>>>>>> I'd still like to know if there's a bug in the spout
causing it to emit
>>>>>>>>>> tuples that were already committed though.
>>>>>>>>>>
>>>>>>>>>> Den man. 3. dec. 2018 kl. 11.29 skrev saurabh mimani
<
>>>>>>>>>> mimani.saurabh@gmail.com>:
>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> Version Info:
>>>>>>>>>>>    "org.apache.storm" % "storm-core" % "1.2.1"
>>>>>>>>>>>    "org.apache.storm" % "storm-kafka-client"
% "1.2.1"
>>>>>>>>>>>
>>>>>>>>>>> I have a storm topology which looks like following:
>>>>>>>>>>>
>>>>>>>>>>> boltA -> boltB -> boltC -> boltD
>>>>>>>>>>>
>>>>>>>>>>> boltA just does some formatting of requests and
emits another
>>>>>>>>>>> tuple. boltB does some processing and emits around
100 tuples
>>>>>>>>>>> for each tuple being received. boltC and boltD
processes these
>>>>>>>>>>> tuples. All the bolts implements BaseBasicBolt.
>>>>>>>>>>>
>>>>>>>>>>> What I am noticing is whenever boltD marks some
tuple as fail
>>>>>>>>>>> and marks that for retry by throwing FailedException,
After a
>>>>>>>>>>> few minutes less than my topology timeout, I
get the following error:
>>>>>>>>>>>
>>>>>>>>>>> 2018-11-30T20:01:05.261+05:30 util [ERROR] Async
loop died!
>>>>>>>>>>> java.lang.IllegalStateException: Attempting to
emit a message that has already been committed. This should never occur when using the at-least-once
processing guarantee.
>>>>>>>>>>>         at org.apache.storm.kafka.spout.KafkaSpout.emitOrRetryTuple(KafkaSpout.java:471)
~[stormjar.jar:?]
>>>>>>>>>>>         at org.apache.storm.kafka.spout.KafkaSpout.emitIfWaitingNotEmitted(KafkaSpout.java:440)
~[stormjar.jar:?]
>>>>>>>>>>>         at org.apache.storm.kafka.spout.KafkaSpout.nextTuple(KafkaSpout.java:308)
~[stormjar.jar:?]
>>>>>>>>>>>         at org.apache.storm.daemon.executor$fn__4975$fn__4990$fn__5021.invoke(executor.clj:654)
~[storm-core-1.2.1.jar:1.2.1]
>>>>>>>>>>>         at org.apache.storm.util$async_loop$fn__557.invoke(util.clj:484)
[storm-core-1.2.1.jar:1.2.1]
>>>>>>>>>>>         at clojure.lang.AFn.run(AFn.java:22)
[clojure-1.7.0.jar:?]
>>>>>>>>>>>         at java.lang.Thread.run(Thread.java:745)
[?:1.8.0_60]
>>>>>>>>>>> 2018-11-30T20:01:05.262+05:30 executor [ERROR]
>>>>>>>>>>> java.lang.IllegalStateException: Attempting to
emit a message that has already been committed. This should never occur when using the at-least-once
processing guarantee.
>>>>>>>>>>>         at org.apache.storm.kafka.spout.KafkaSpout.emitOrRetryTuple(KafkaSpout.java:471)
~[stormjar.jar:?]
>>>>>>>>>>>         at org.apache.storm.kafka.spout.KafkaSpout.emitIfWaitingNotEmitted(KafkaSpout.java:440)
~[stormjar.jar:?]
>>>>>>>>>>>         at org.apache.storm.kafka.spout.KafkaSpout.nextTuple(KafkaSpout.java:308)
~[stormjar.jar:?]
>>>>>>>>>>>         at org.apache.storm.daemon.executor$fn__4975$fn__4990$fn__5021.invoke(executor.clj:654)
~[storm-core-1.2.1.jar:1.2.1]
>>>>>>>>>>>         at org.apache.storm.util$async_loop$fn__557.invoke(util.clj:484)
[storm-core-1.2.1.jar:1.2.1]
>>>>>>>>>>>         at clojure.lang.AFn.run(AFn.java:22)
[clojure-1.7.0.jar:?]
>>>>>>>>>>>         at java.lang.Thread.run(Thread.java:745)
[?:1.8.0_60]
>>>>>>>>>>>
>>>>>>>>>>> What seems to be happening is this happens when
boltB emits 100
>>>>>>>>>>> out of 1 tuple and boltDfails one of the tuples
out of those
>>>>>>>>>>> 100 tuples, I am getting this error. Not able
to understand how to fix
>>>>>>>>>>> this, ideally it should ack an original tuple
when all 100
>>>>>>>>>>> tuples are acked, but probably an original tuple
is acked before all those
>>>>>>>>>>> 100 tuples are acked, which causes this error.
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> Best Regards
>>>>>>>>>>>
>>>>>>>>>>> Saurabh Kumar Mimani
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>
>>>>> --
>>>>>
>>>>>
>>>>>
>>>>> *Peter Chamberlain* | Senior Software Engineer | HTK
>>>>>
>>>>> T: +44(0)870 600 2311
>>>>> Connect with me: Email <peter.chamberlain@htk.co.uk>
>>>>>
>>>>>
>>>>> [image: htk logo] <http://www.htk.co.uk/>
>>>>>
>>>>> Connect with HTK: htk.co.uk <http://www.htk.co.uk/> | LinkedIn
>>>>> <http://www.linkedin.com/company/htk/> | Twitter
>>>>> <http://www.twitter.com/htkhorizon>
>>>>>
>>>>> HTK Limited, Chapmans Warehouse, Wherry Quay, Ipswich, IP4 1AS, UK.
>>>>> Company Registered in England and Wales as 3191677, VAT Number 675
>>>>> 9467 71
>>>>>
>>>>>
>>>>> PLEASE CONSIDER THE ENVIRONMENT BEFORE PRINTING THIS EMAIL.
>>>>> This email is only for the use of the intended recipients and may
>>>>> contain privileged information. If you’ve received this email in error,
>>>>> please let the sender know; then delete the message. The views
>>>>> expressed in this email represent those of the sender and not necessarily
>>>>> of HTK.
>>>>>
>>>>

Mime
View raw message