storm-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From saurabh mimani <mimani.saur...@gmail.com>
Subject Re: Storm using Kafka Spout gives error: IllegalStateException
Date Sat, 22 Dec 2018 04:52:48 GMT
It does not got reproduced yesterday as well, when I was doing my runs.

I see last release of storm-kafka-client in May-18, It will be great if we
can release a new version of it as I see there are many commits after this.

Appreciating your efforts to provide a fix for it, Thanks a lot :)


`


Best Regards

Saurabh Kumar Mimani




On Thu, Dec 20, 2018 at 9:17 AM saurabh mimani <mimani.saurabh@gmail.com>
wrote:

> Thanks for the info, I have tried few runs with the code you mentioned
> here
> <https://github.com/apache/storm/blob/2dc3d53a11aa3fea621666690d1e44fa8b621466/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java#L465-L473.>,
> putting that back in 1.x-branch, I did not got this exception so far, I
> will run it few more times to be sure and let you know.
>
>
>
>
> Best Regards
>
> Saurabh Kumar Mimani
>
>
>
>
> On Thu, Dec 13, 2018 at 1:42 PM Stig Rohde Døssing <stigdoessing@gmail.com>
> wrote:
>
>> Sounds good. Keep in mind that the branch I linked doesn't perform the
>> check that was throwing the exception at all. If you want to be sure that
>> the bug is gone, you might want to take the branch and copy in the check
>> https://github.com/apache/storm/blob/2dc3d53a11aa3fea621666690d1e44fa8b621466/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java#L465-L473
>> .
>>
>> We removed it because it was causing performance issues with newer Kafka
>> versions. If the bug is still there, you won't get the exception anymore,
>> but the spout might emit already processed tuples. Let me know if you'd
>> like to test the spout with the check still in place, and I'll make a
>> branch that still has the check.
>>
>> I don't have a date for 1.2.3, but it could probably be pretty soon. If
>> you'd like to test the spout with the exception check still in place, it
>> would be good to do first, but otherwise we should be able to start the
>> release process soon.
>>
>> Den tor. 13. dec. 2018 kl. 07.51 skrev saurabh mimani <
>> mimani.saurabh@gmail.com>:
>>
>>> Hey Stig,
>>>
>>> Thanks for looking into this and providing a fix. We have build the
>>> storm-kafka-client jar from your branch and it is working fine on that. We
>>> are still testing it and will let you know if there are any issues we fix.
>>>
>>> I see for STORM-3301 <https://issues.apache.org/jira/browse/STORM-3301>,
>>> you have put the fix version as 1.2.3, any approximate date when will this
>>> be released?
>>>
>>> Thanks for your help :)
>>>
>>>
>>>
>>>
>>>
>>> Best Regards
>>>
>>> Saurabh Kumar Mimani
>>>
>>>
>>>
>>>
>>> On Mon, Dec 10, 2018 at 1:12 AM Stig Rohde Døssing <
>>> stigdoessing@gmail.com> wrote:
>>>
>>>> I'm assuming you applied the fix on top of 1.2.1 or something like
>>>> that? The exception can't be thrown from the branch I linked, since it was
>>>> removed in an earlier commit in 1.x-branch.
>>>>
>>>> Your logs show that the committed offset for partition 6 is 1682098 (98
>>>> for short). 98 was emitted, since it shows up in the emitted list. I'm
>>>> guessing it failed and was replayed. 99 and up are in the acked list, so
>>>> they are ready to commit as soon as 98 finishes processing.
>>>>
>>>> The log shows that 99 is the tuple encountering the exception, so I'm
>>>> guessing what happened is that 98 was acked and the spout decided to commit
>>>> 98, 99 etc. For some reason it then still decides to emit 99. The only
>>>> reasons I can think of (barring bugs in Kafka/the Kafka client) for that
to
>>>> happen would be that 99 is in waitingToEmit and isn't being cleared out
>>>> during the commit (this is the bug I tried to fix), somehow 99 is still
>>>> queued for retry (this should not be possible) or for some reason the
>>>> consumer position ends up below the committed offset. I think the best bet
>>>> for tracking down why it happens would be logging the contents of the
>>>> RetryService, the contents of waitingToEmit and the consumer position both
>>>> after commitOffsetsForAckedTuples, and right before the exception is
>>>> thrown.
>>>>
>>>> Could you try logging those? I can add the log statements on top of the
>>>> bugfix if needed.
>>>>
>>>> Den søn. 9. dec. 2018 kl. 18.42 skrev saurabh mimani <
>>>> mimani.saurabh@gmail.com>:
>>>>
>>>>> Hey, I see this is still happening, this time, it seems, as it seemed
>>>>> to me, because same offset from different partition was committed(guessing
>>>>> from logs), but not sure as that should be handled.
>>>>>
>>>>> Please find the logs here
>>>>> <https://gist.github.com/mimani/ff27b7272482efc91c4d145d59ab59be>.
>>>>>
>>>>>
>>>>>
>>>>> Best Regards
>>>>>
>>>>> Saurabh Kumar Mimani
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> On Sun, Dec 9, 2018 at 3:19 AM Stig Rohde Døssing <
>>>>> stigdoessing@gmail.com> wrote:
>>>>>
>>>>>> I believe I have a fix, your logs were helpful. Please try out the
>>>>>> changes in https://github.com/apache/storm/pull/2923/files.
>>>>>>
>>>>>> Den lør. 8. dec. 2018 kl. 07.25 skrev saurabh mimani <
>>>>>> mimani.saurabh@gmail.com>:
>>>>>>
>>>>>>> Hey,
>>>>>>>
>>>>>>> Thanks for looking into this. I was not able to produce this
earlier
>>>>>>> on my local, however I will again try once. I was consistently
able to
>>>>>>> reproduce it with parallelism of 5 for boltA and parallelism
of 200 with
>>>>>>> boltB on 2 machines in cluster mode.
>>>>>>>
>>>>>>> I will try again with your code once.
>>>>>>>
>>>>>>> These
>>>>>>> <https://gist.github.com/mimani/56dd31db34e4356b25c796d78261f7b8>
are
>>>>>>> logs of Kafka Spout, when I was able to reproduce it in cluster
mode with
>>>>>>> my topology, in case these helps.
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> Best Regards
>>>>>>>
>>>>>>> Saurabh Kumar Mimani
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> On Wed, Dec 5, 2018 at 11:33 PM Stig Rohde Døssing <
>>>>>>> stigdoessing@gmail.com> wrote:
>>>>>>>
>>>>>>>> I can't reproduce this.
>>>>>>>>
>>>>>>>> I created a test topology similar to the code you posted,
based on
>>>>>>>> the 1.2.1 release tag
>>>>>>>> https://github.com/srdo/storm/commit/f5577f7a773f3d433b90a2670de5329b396f5564
>>>>>>>> .
>>>>>>>>
>>>>>>>> I set up a local Kafka instance and put enough messages to
run the
>>>>>>>> topology for 15 minutes or so in the test topic. After populating
the
>>>>>>>> topic, I started the topology and let it run until it reached
the end of
>>>>>>>> the topic. As expected a lot of messages failed, but after
a while it
>>>>>>>> managed to successfully process all messages. I didn't see
any worker
>>>>>>>> crashes, and the logs only show some errors related to moving
files
>>>>>>>> (expected on Windows).
>>>>>>>>
>>>>>>>> The topology seems to work fine against both Kafka 0.10.2.2
and
>>>>>>>> 1.1.1, though 1.1.1 is slower due to
>>>>>>>> https://issues.apache.org/jira/browse/STORM-3102.
>>>>>>>>
>>>>>>>> The Kafka setup was with the default configuration for both
Kafka
>>>>>>>> and Zookeeper, and Storm was set up with a local Nimbus,
single local
>>>>>>>> Supervisor and 4 worker slots.
>>>>>>>>
>>>>>>>> Saurabh please try to reproduce the issue using the topology
I
>>>>>>>> linked. If you need to make adjustments in order to provoke
the issue,
>>>>>>>> please update the code and link it so I can check it out
and try it.
>>>>>>>>
>>>>>>>> Den ons. 5. dec. 2018 kl. 16.42 skrev saurabh mimani <
>>>>>>>> mimani.saurabh@gmail.com>:
>>>>>>>>
>>>>>>>>> No, I have checked that, there is no other consumer group
>>>>>>>>> consuming from the same.
>>>>>>>>>
>>>>>>>>> Thanks for looking into it, let me know if you need any
>>>>>>>>> other information.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> Best Regards
>>>>>>>>>
>>>>>>>>> Saurabh Kumar Mimani
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Wed, Dec 5, 2018 at 9:02 PM Stig Rohde Døssing <
>>>>>>>>> stigdoessing@gmail.com> wrote:
>>>>>>>>>
>>>>>>>>>> Ravi, BaseBasicBolt does automatically anchor any
emitted tuples
>>>>>>>>>> to the input tuple. It's intended for bolts that
just need to receive a
>>>>>>>>>> tuple, synchronously process it and emit some new
tuples anchored to the
>>>>>>>>>> input tuple. It's there because doing manual acking
is tedious and
>>>>>>>>>> error-prone in cases where you don't need to be able
to e.g. emit new
>>>>>>>>>> unachored tuples or ack the input tuple asynchronously.
As Peter mentioned,
>>>>>>>>>> the BasicBoltExecutor (
>>>>>>>>>> https://github.com/apache/storm/blob/21bb1388414d373572779289edc785c7e5aa52aa/storm-client/src/jvm/org/apache/storm/topology/BasicBoltExecutor.java#L42)
>>>>>>>>>> handles acking for you.
>>>>>>>>>>
>>>>>>>>>> Saurabh, I'll see if I can reproduce your issue.
Please also
>>>>>>>>>> check that you don't have any other consumers using
the same consumer group
>>>>>>>>>> as the spout.
>>>>>>>>>>
>>>>>>>>>> Den ons. 5. dec. 2018 kl. 11.53 skrev Peter Chamberlain
<
>>>>>>>>>> peter.chamberlain@htk.co.uk>:
>>>>>>>>>>
>>>>>>>>>>> Pretty sure that the ack path is handled by BasicBoltExecutor
>>>>>>>>>>> (an implmentation of IRichBolt), which calls
collector.setContext(input),
>>>>>>>>>>> and also does the acking inside it's execute
function, and in between calls
>>>>>>>>>>> the BaseBasicBolt.execute version (which takes
the collector as well as the
>>>>>>>>>>> tuple as parameters).
>>>>>>>>>>> So the intention is clearly that it is automatically
anchored
>>>>>>>>>>> and acknowledged.
>>>>>>>>>>>
>>>>>>>>>>> On Wed, 5 Dec 2018 at 09:57, Ravi Sharma <ping2ravi@gmail.com>
>>>>>>>>>>> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Hi Saurabh,
>>>>>>>>>>>> I checked the BaseBasicBolt which comes with
storm, it doesn't
>>>>>>>>>>>> do much.
>>>>>>>>>>>> Also checked BasicOutputCollector and don't
see how it will
>>>>>>>>>>>> anchor automatically unless you call
>>>>>>>>>>>>  BasicOutputCollector.setContext(Tuple tuple),
don't see all of your code,
>>>>>>>>>>>> but don't see this call in your boltA code.
>>>>>>>>>>>> Also it looks like even when you make this
setContext call,
>>>>>>>>>>>> after that you will have to emit using following
emit function
>>>>>>>>>>>>
>>>>>>>>>>>> BasicOutputCollector.emit(String streamId,
List<Object> tuples)
>>>>>>>>>>>>
>>>>>>>>>>>> Basically just check that whatever emit function
is called, it
>>>>>>>>>>>> does pass the input tuple.
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> Once I had exactly same issue for few days,
but mine was
>>>>>>>>>>>> related to config. I wanted to read from
two Kafka topics in one topology
>>>>>>>>>>>> and had two different kafkaspout created,
mistakenly I copy pasted same
>>>>>>>>>>>> config and that caused this issue. Not sure
if that applies to your
>>>>>>>>>>>> scenario.
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> *NOTE*: I checked the latest storm master
branch for code.
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> On Wed, 5 Dec 2018, 08:11 saurabh mimani
<
>>>>>>>>>>>> mimani.saurabh@gmail.com wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> Hey Ravi,
>>>>>>>>>>>>>
>>>>>>>>>>>>> I am using *BaseBasicBolt*, which as
described here
>>>>>>>>>>>>> <http://storm.apache.org/releases/1.0.6/Guaranteeing-message-processing.html>
>>>>>>>>>>>>> : Tuples emitted to BasicOutputCollector
are automatically
>>>>>>>>>>>>> anchored to the input tuple, and the
input tuple is acked for you
>>>>>>>>>>>>> automatically when the execute method
completes.
>>>>>>>>>>>>>
>>>>>>>>>>>>> What you are saying is applicable for
*BaseRichBolt. *The
>>>>>>>>>>>>> Kafka spout I am using is from storm-kafka-client
>>>>>>>>>>>>> <https://mvnrepository.com/artifact/org.apache.storm/storm-kafka-client/1.2.1>
>>>>>>>>>>>>> library, so unique ID, etc should be
already taken care of.
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> Best Regards
>>>>>>>>>>>>>
>>>>>>>>>>>>> Saurabh Kumar Mimani
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> On Wed, Dec 5, 2018 at 12:52 PM Ravi
Sharma <
>>>>>>>>>>>>> ping2ravi@gmail.com> wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>>> Hi Saurabh,
>>>>>>>>>>>>>> I think there is issue with part
of code which is emitting
>>>>>>>>>>>>>> the tuples.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> If you want to use ack mechanism,
you need to use anchor
>>>>>>>>>>>>>> tuple when you emit from bolts.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Collector.emit(Tuple input, Values
data)
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Also make sure Kafka spout emits
tuple with a unique id.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Thanks
>>>>>>>>>>>>>> Ravi
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> On Wed, 5 Dec 2018, 06:35 saurabh
mimani <
>>>>>>>>>>>>>> mimani.saurabh@gmail.com wrote:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Hey,
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Thanks for your reply. What you
are saying is correct,
>>>>>>>>>>>>>>> However I am able to reproduce
it more often and I think it happens when
>>>>>>>>>>>>>>> multiple tuples gets failed in
first run but all of those gets success on
>>>>>>>>>>>>>>> retry, something of that sort
is happening.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> This can be reproduced using
following two bolts and
>>>>>>>>>>>>>>> kafkaSpout easily, by running
in cluster more with 3/4 minutes:
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> *BoltA*
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> case class Abc(index: Int, rand:
Boolean)
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> class BoltA  extends BaseBasicBolt
{
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>   override def execute(input:
Tuple, collector: BasicOutputCollector): Unit = {
>>>>>>>>>>>>>>>     val inp = input.getBinaryByField("value").getObj[someObj]
>>>>>>>>>>>>>>>     val randomGenerator = new
Random()
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>     var i = 0
>>>>>>>>>>>>>>>     val rand = randomGenerator.nextBoolean()
>>>>>>>>>>>>>>>     1 to 100 foreach {
>>>>>>>>>>>>>>>       collector.emit(new Values(Abc(i,
rand).getJsonBytes))
>>>>>>>>>>>>>>>       i += 1
>>>>>>>>>>>>>>>     }
>>>>>>>>>>>>>>>   }
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>   override def declareOutputFields(declarer:
OutputFieldsDeclarer): Unit = {
>>>>>>>>>>>>>>>     declarer.declare(new Fields("boltAout"))
>>>>>>>>>>>>>>>   }
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> }
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> *BoltB*
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> class BoltB  extends BaseBasicBolt
{
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>   override def execute(input:
Tuple, collector: BasicOutputCollector): Unit = {
>>>>>>>>>>>>>>>     val abc = input.getBinaryByField("boltAout").getObj[Abc]
>>>>>>>>>>>>>>>     println(s"Received ${abc.index}th
tuple in BoltB")
>>>>>>>>>>>>>>>     if(abc.index >= 97 &&
abc.rand){
>>>>>>>>>>>>>>>       println(s"throwing FailedException
for ${abc.index}th tuple for")
>>>>>>>>>>>>>>>       throw new FailedException()
>>>>>>>>>>>>>>>     }
>>>>>>>>>>>>>>>   }
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>   override def declareOutputFields(declarer:
OutputFieldsDeclarer): Unit = {
>>>>>>>>>>>>>>>   }
>>>>>>>>>>>>>>> }
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> *KafkaSpout:*
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> private def getKafkaSpoutConfig(source:
Config) = KafkaSpoutConfig.builder("connections.kafka.producerConnProps.metadata.broker.list",
"queueName")
>>>>>>>>>>>>>>>     .setProp(ConsumerConfig.GROUP_ID_CONFIG,
"grp")
>>>>>>>>>>>>>>>     .setProp(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.ByteArrayDeserializer")
>>>>>>>>>>>>>>>     .setOffsetCommitPeriodMs(100)
>>>>>>>>>>>>>>>     .setRetry(new KafkaSpoutRetryExponentialBackoff(
>>>>>>>>>>>>>>>       KafkaSpoutRetryExponentialBackoff.TimeInterval.milliSeconds(100),
>>>>>>>>>>>>>>>       KafkaSpoutRetryExponentialBackoff.TimeInterval.milliSeconds(100),
>>>>>>>>>>>>>>>       10,
>>>>>>>>>>>>>>>       KafkaSpoutRetryExponentialBackoff.TimeInterval.milliSeconds(3000)
>>>>>>>>>>>>>>>     ))
>>>>>>>>>>>>>>>     .setFirstPollOffsetStrategy(offsetStrategyMapping(ConnektConfig.getOrElse("connections.kafka.consumerConnProps.offset.strategy",
"UNCOMMITTED_EARLIEST")))
>>>>>>>>>>>>>>>     .setMaxUncommittedOffsets(ConnektConfig.getOrElse("connections.kafka.consumerConnProps.max.uncommited.offset",
10000))
>>>>>>>>>>>>>>>     .build()
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Other config:
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> messageTimeoutInSecons: 300
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> [image: Screenshot 2018-12-05
at 12.03.08 PM.png]
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Best Regards
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Saurabh Kumar Mimani
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> On Mon, Dec 3, 2018 at 9:18 PM
Stig Rohde Døssing <
>>>>>>>>>>>>>>> stigdoessing@gmail.com> wrote:
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Hi Saurabh,
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> The tuple emitted by the
spout will only be acked once all
>>>>>>>>>>>>>>>> branches of the tuple tree
have been acked, i.e. all 100 tuples are acked.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> The error you're seeing was
added as part of
>>>>>>>>>>>>>>>> https://issues.apache.org/jira/browse/STORM-2666
to try to
>>>>>>>>>>>>>>>> avoid having that bug pop
up again. Could you try posting your spout
>>>>>>>>>>>>>>>> configuration? Also if possible,
it would be helpful if you could enable
>>>>>>>>>>>>>>>> debug logging for org.apache.storm.kafka.spout.KafkaSpout
>>>>>>>>>>>>>>>> and maybe also org.apache.storm.kafka.spout.internal.OffsetManager.
>>>>>>>>>>>>>>>> They log when offsets are
committed (e.g.
>>>>>>>>>>>>>>>> https://github.com/apache/storm/blob/v1.2.1/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java#L546),
>>>>>>>>>>>>>>>> and also when the consumer
position is changed (e.g.
>>>>>>>>>>>>>>>> https://github.com/apache/storm/blob/v1.2.1/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java#L561
>>>>>>>>>>>>>>>> ). It should be possible
to track down when/why the consumer position wound
>>>>>>>>>>>>>>>> up behind the committed offset.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Just so you're aware, the
check that crashes the spout has
>>>>>>>>>>>>>>>> been removed as of
>>>>>>>>>>>>>>>> https://issues.apache.org/jira/browse/STORM-3102.
I'd
>>>>>>>>>>>>>>>> still like to know if there's
a bug in the spout causing it to emit tuples
>>>>>>>>>>>>>>>> that were already committed
though.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Den man. 3. dec. 2018 kl.
11.29 skrev saurabh mimani <
>>>>>>>>>>>>>>>> mimani.saurabh@gmail.com>:
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Version Info:
>>>>>>>>>>>>>>>>>    "org.apache.storm"
% "storm-core" % "1.2.1"
>>>>>>>>>>>>>>>>>    "org.apache.storm"
% "storm-kafka-client" % "1.2.1"
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> I have a storm topology
which looks like following:
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> boltA -> boltB ->
boltC -> boltD
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> boltA just does some
formatting of requests and emits
>>>>>>>>>>>>>>>>> another tuple. boltB
does some processing and emits
>>>>>>>>>>>>>>>>> around 100 tuples for
each tuple being received. boltC
>>>>>>>>>>>>>>>>>  and boltD processes
these tuples. All the bolts
>>>>>>>>>>>>>>>>> implements BaseBasicBolt.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> What I am noticing is
whenever boltD marks some tuple as
>>>>>>>>>>>>>>>>> fail and marks that for
retry by throwing FailedException,
>>>>>>>>>>>>>>>>> After a few minutes less
than my topology timeout, I get the following
>>>>>>>>>>>>>>>>> error:
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> 2018-11-30T20:01:05.261+05:30
util [ERROR] Async loop died!
>>>>>>>>>>>>>>>>> java.lang.IllegalStateException:
Attempting to emit a message that has already been committed. This should never occur when
using the at-least-once processing guarantee.
>>>>>>>>>>>>>>>>>         at org.apache.storm.kafka.spout.KafkaSpout.emitOrRetryTuple(KafkaSpout.java:471)
~[stormjar.jar:?]
>>>>>>>>>>>>>>>>>         at org.apache.storm.kafka.spout.KafkaSpout.emitIfWaitingNotEmitted(KafkaSpout.java:440)
~[stormjar.jar:?]
>>>>>>>>>>>>>>>>>         at org.apache.storm.kafka.spout.KafkaSpout.nextTuple(KafkaSpout.java:308)
~[stormjar.jar:?]
>>>>>>>>>>>>>>>>>         at org.apache.storm.daemon.executor$fn__4975$fn__4990$fn__5021.invoke(executor.clj:654)
~[storm-core-1.2.1.jar:1.2.1]
>>>>>>>>>>>>>>>>>         at org.apache.storm.util$async_loop$fn__557.invoke(util.clj:484)
[storm-core-1.2.1.jar:1.2.1]
>>>>>>>>>>>>>>>>>         at clojure.lang.AFn.run(AFn.java:22)
[clojure-1.7.0.jar:?]
>>>>>>>>>>>>>>>>>         at java.lang.Thread.run(Thread.java:745)
[?:1.8.0_60]
>>>>>>>>>>>>>>>>> 2018-11-30T20:01:05.262+05:30
executor [ERROR]
>>>>>>>>>>>>>>>>> java.lang.IllegalStateException:
Attempting to emit a message that has already been committed. This should never occur when
using the at-least-once processing guarantee.
>>>>>>>>>>>>>>>>>         at org.apache.storm.kafka.spout.KafkaSpout.emitOrRetryTuple(KafkaSpout.java:471)
~[stormjar.jar:?]
>>>>>>>>>>>>>>>>>         at org.apache.storm.kafka.spout.KafkaSpout.emitIfWaitingNotEmitted(KafkaSpout.java:440)
~[stormjar.jar:?]
>>>>>>>>>>>>>>>>>         at org.apache.storm.kafka.spout.KafkaSpout.nextTuple(KafkaSpout.java:308)
~[stormjar.jar:?]
>>>>>>>>>>>>>>>>>         at org.apache.storm.daemon.executor$fn__4975$fn__4990$fn__5021.invoke(executor.clj:654)
~[storm-core-1.2.1.jar:1.2.1]
>>>>>>>>>>>>>>>>>         at org.apache.storm.util$async_loop$fn__557.invoke(util.clj:484)
[storm-core-1.2.1.jar:1.2.1]
>>>>>>>>>>>>>>>>>         at clojure.lang.AFn.run(AFn.java:22)
[clojure-1.7.0.jar:?]
>>>>>>>>>>>>>>>>>         at java.lang.Thread.run(Thread.java:745)
[?:1.8.0_60]
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> What seems to be happening
is this happens when boltB emits
>>>>>>>>>>>>>>>>> 100 out of 1 tuple and
boltDfails one of the tuples out
>>>>>>>>>>>>>>>>> of those 100 tuples,
I am getting this error. Not able to understand how to
>>>>>>>>>>>>>>>>> fix this, ideally it
should ack an original tuple when
>>>>>>>>>>>>>>>>> all 100 tuples are acked,
but probably an original tuple is acked before
>>>>>>>>>>>>>>>>> all those 100 tuples
are acked, which causes this error.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Best Regards
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Saurabh Kumar Mimani
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> --
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> *Peter Chamberlain* | Senior Software Engineer
| HTK
>>>>>>>>>>>
>>>>>>>>>>> T: +44(0)870 600 2311
>>>>>>>>>>> Connect with me: Email <peter.chamberlain@htk.co.uk>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> [image: htk logo] <http://www.htk.co.uk/>
>>>>>>>>>>>
>>>>>>>>>>> Connect with HTK: htk.co.uk <http://www.htk.co.uk/>
| LinkedIn
>>>>>>>>>>> <http://www.linkedin.com/company/htk/>
| Twitter
>>>>>>>>>>> <http://www.twitter.com/htkhorizon>
>>>>>>>>>>>
>>>>>>>>>>> HTK Limited, Chapmans Warehouse, Wherry Quay,
Ipswich, IP4 1AS,
>>>>>>>>>>> UK.
>>>>>>>>>>> Company Registered in England and Wales as 3191677,
VAT Number
>>>>>>>>>>> 675 9467 71
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> PLEASE CONSIDER THE ENVIRONMENT BEFORE PRINTING
THIS EMAIL.
>>>>>>>>>>> This email is only for the use of the intended
recipients and
>>>>>>>>>>> may contain privileged information. If you’ve
received this email in error,
>>>>>>>>>>> please let the sender know; then delete the message.
The views
>>>>>>>>>>> expressed in this email represent those of the
sender and not necessarily
>>>>>>>>>>> of HTK.
>>>>>>>>>>>
>>>>>>>>>>

Mime
View raw message