storm-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Stig Rohde Døssing <stigdoess...@gmail.com>
Subject Re: Storm using Kafka Spout gives error: IllegalStateException
Date Thu, 13 Dec 2018 08:12:28 GMT
Sounds good. Keep in mind that the branch I linked doesn't perform the
check that was throwing the exception at all. If you want to be sure that
the bug is gone, you might want to take the branch and copy in the check
https://github.com/apache/storm/blob/2dc3d53a11aa3fea621666690d1e44fa8b621466/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java#L465-L473
.

We removed it because it was causing performance issues with newer Kafka
versions. If the bug is still there, you won't get the exception anymore,
but the spout might emit already processed tuples. Let me know if you'd
like to test the spout with the check still in place, and I'll make a
branch that still has the check.

I don't have a date for 1.2.3, but it could probably be pretty soon. If
you'd like to test the spout with the exception check still in place, it
would be good to do first, but otherwise we should be able to start the
release process soon.

Den tor. 13. dec. 2018 kl. 07.51 skrev saurabh mimani <
mimani.saurabh@gmail.com>:

> Hey Stig,
>
> Thanks for looking into this and providing a fix. We have build the
> storm-kafka-client jar from your branch and it is working fine on that. We
> are still testing it and will let you know if there are any issues we fix.
>
> I see for STORM-3301 <https://issues.apache.org/jira/browse/STORM-3301>,
> you have put the fix version as 1.2.3, any approximate date when will this
> be released?
>
> Thanks for your help :)
>
>
>
>
>
> Best Regards
>
> Saurabh Kumar Mimani
>
>
>
>
> On Mon, Dec 10, 2018 at 1:12 AM Stig Rohde Døssing <stigdoessing@gmail.com>
> wrote:
>
>> I'm assuming you applied the fix on top of 1.2.1 or something like that?
>> The exception can't be thrown from the branch I linked, since it was
>> removed in an earlier commit in 1.x-branch.
>>
>> Your logs show that the committed offset for partition 6 is 1682098 (98
>> for short). 98 was emitted, since it shows up in the emitted list. I'm
>> guessing it failed and was replayed. 99 and up are in the acked list, so
>> they are ready to commit as soon as 98 finishes processing.
>>
>> The log shows that 99 is the tuple encountering the exception, so I'm
>> guessing what happened is that 98 was acked and the spout decided to commit
>> 98, 99 etc. For some reason it then still decides to emit 99. The only
>> reasons I can think of (barring bugs in Kafka/the Kafka client) for that to
>> happen would be that 99 is in waitingToEmit and isn't being cleared out
>> during the commit (this is the bug I tried to fix), somehow 99 is still
>> queued for retry (this should not be possible) or for some reason the
>> consumer position ends up below the committed offset. I think the best bet
>> for tracking down why it happens would be logging the contents of the
>> RetryService, the contents of waitingToEmit and the consumer position both
>> after commitOffsetsForAckedTuples, and right before the exception is
>> thrown.
>>
>> Could you try logging those? I can add the log statements on top of the
>> bugfix if needed.
>>
>> Den søn. 9. dec. 2018 kl. 18.42 skrev saurabh mimani <
>> mimani.saurabh@gmail.com>:
>>
>>> Hey, I see this is still happening, this time, it seems, as it seemed to
>>> me, because same offset from different partition was committed(guessing
>>> from logs), but not sure as that should be handled.
>>>
>>> Please find the logs here
>>> <https://gist.github.com/mimani/ff27b7272482efc91c4d145d59ab59be>.
>>>
>>>
>>>
>>> Best Regards
>>>
>>> Saurabh Kumar Mimani
>>>
>>>
>>>
>>>
>>> On Sun, Dec 9, 2018 at 3:19 AM Stig Rohde Døssing <
>>> stigdoessing@gmail.com> wrote:
>>>
>>>> I believe I have a fix, your logs were helpful. Please try out the
>>>> changes in https://github.com/apache/storm/pull/2923/files.
>>>>
>>>> Den lør. 8. dec. 2018 kl. 07.25 skrev saurabh mimani <
>>>> mimani.saurabh@gmail.com>:
>>>>
>>>>> Hey,
>>>>>
>>>>> Thanks for looking into this. I was not able to produce this earlier
>>>>> on my local, however I will again try once. I was consistently able to
>>>>> reproduce it with parallelism of 5 for boltA and parallelism of 200 with
>>>>> boltB on 2 machines in cluster mode.
>>>>>
>>>>> I will try again with your code once.
>>>>>
>>>>> These
>>>>> <https://gist.github.com/mimani/56dd31db34e4356b25c796d78261f7b8>
are
>>>>> logs of Kafka Spout, when I was able to reproduce it in cluster mode
with
>>>>> my topology, in case these helps.
>>>>>
>>>>>
>>>>>
>>>>> Best Regards
>>>>>
>>>>> Saurabh Kumar Mimani
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> On Wed, Dec 5, 2018 at 11:33 PM Stig Rohde Døssing <
>>>>> stigdoessing@gmail.com> wrote:
>>>>>
>>>>>> I can't reproduce this.
>>>>>>
>>>>>> I created a test topology similar to the code you posted, based on
>>>>>> the 1.2.1 release tag
>>>>>> https://github.com/srdo/storm/commit/f5577f7a773f3d433b90a2670de5329b396f5564
>>>>>> .
>>>>>>
>>>>>> I set up a local Kafka instance and put enough messages to run the
>>>>>> topology for 15 minutes or so in the test topic. After populating
the
>>>>>> topic, I started the topology and let it run until it reached the
end of
>>>>>> the topic. As expected a lot of messages failed, but after a while
it
>>>>>> managed to successfully process all messages. I didn't see any worker
>>>>>> crashes, and the logs only show some errors related to moving files
>>>>>> (expected on Windows).
>>>>>>
>>>>>> The topology seems to work fine against both Kafka 0.10.2.2 and
>>>>>> 1.1.1, though 1.1.1 is slower due to
>>>>>> https://issues.apache.org/jira/browse/STORM-3102.
>>>>>>
>>>>>> The Kafka setup was with the default configuration for both Kafka
and
>>>>>> Zookeeper, and Storm was set up with a local Nimbus, single local
>>>>>> Supervisor and 4 worker slots.
>>>>>>
>>>>>> Saurabh please try to reproduce the issue using the topology I
>>>>>> linked. If you need to make adjustments in order to provoke the issue,
>>>>>> please update the code and link it so I can check it out and try
it.
>>>>>>
>>>>>> Den ons. 5. dec. 2018 kl. 16.42 skrev saurabh mimani <
>>>>>> mimani.saurabh@gmail.com>:
>>>>>>
>>>>>>> No, I have checked that, there is no other consumer group consuming
>>>>>>> from the same.
>>>>>>>
>>>>>>> Thanks for looking into it, let me know if you need any
>>>>>>> other information.
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> Best Regards
>>>>>>>
>>>>>>> Saurabh Kumar Mimani
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> On Wed, Dec 5, 2018 at 9:02 PM Stig Rohde Døssing <
>>>>>>> stigdoessing@gmail.com> wrote:
>>>>>>>
>>>>>>>> Ravi, BaseBasicBolt does automatically anchor any emitted
tuples to
>>>>>>>> the input tuple. It's intended for bolts that just need to
receive a tuple,
>>>>>>>> synchronously process it and emit some new tuples anchored
to the input
>>>>>>>> tuple. It's there because doing manual acking is tedious
and error-prone in
>>>>>>>> cases where you don't need to be able to e.g. emit new unachored
tuples or
>>>>>>>> ack the input tuple asynchronously. As Peter mentioned, the
>>>>>>>> BasicBoltExecutor (
>>>>>>>> https://github.com/apache/storm/blob/21bb1388414d373572779289edc785c7e5aa52aa/storm-client/src/jvm/org/apache/storm/topology/BasicBoltExecutor.java#L42)
>>>>>>>> handles acking for you.
>>>>>>>>
>>>>>>>> Saurabh, I'll see if I can reproduce your issue. Please also
check
>>>>>>>> that you don't have any other consumers using the same consumer
group as
>>>>>>>> the spout.
>>>>>>>>
>>>>>>>> Den ons. 5. dec. 2018 kl. 11.53 skrev Peter Chamberlain <
>>>>>>>> peter.chamberlain@htk.co.uk>:
>>>>>>>>
>>>>>>>>> Pretty sure that the ack path is handled by BasicBoltExecutor
(an
>>>>>>>>> implmentation of IRichBolt), which calls collector.setContext(input),
and
>>>>>>>>> also does the acking inside it's execute function, and
in between calls the
>>>>>>>>> BaseBasicBolt.execute version (which takes the collector
as well as the
>>>>>>>>> tuple as parameters).
>>>>>>>>> So the intention is clearly that it is automatically
anchored and
>>>>>>>>> acknowledged.
>>>>>>>>>
>>>>>>>>> On Wed, 5 Dec 2018 at 09:57, Ravi Sharma <ping2ravi@gmail.com>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> Hi Saurabh,
>>>>>>>>>> I checked the BaseBasicBolt which comes with storm,
it doesn't do
>>>>>>>>>> much.
>>>>>>>>>> Also checked BasicOutputCollector and don't see how
it will
>>>>>>>>>> anchor automatically unless you call
>>>>>>>>>>  BasicOutputCollector.setContext(Tuple tuple), don't
see all of your code,
>>>>>>>>>> but don't see this call in your boltA code.
>>>>>>>>>> Also it looks like even when you make this setContext
call, after
>>>>>>>>>> that you will have to emit using following emit function
>>>>>>>>>>
>>>>>>>>>> BasicOutputCollector.emit(String streamId, List<Object>
tuples)
>>>>>>>>>>
>>>>>>>>>> Basically just check that whatever emit function
is called, it
>>>>>>>>>> does pass the input tuple.
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> Once I had exactly same issue for few days, but mine
was related
>>>>>>>>>> to config. I wanted to read from two Kafka topics
in one topology and had
>>>>>>>>>> two different kafkaspout created, mistakenly I copy
pasted same config and
>>>>>>>>>> that caused this issue. Not sure if that applies
to your scenario.
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> *NOTE*: I checked the latest storm master branch
for code.
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On Wed, 5 Dec 2018, 08:11 saurabh mimani <
>>>>>>>>>> mimani.saurabh@gmail.com wrote:
>>>>>>>>>>
>>>>>>>>>>> Hey Ravi,
>>>>>>>>>>>
>>>>>>>>>>> I am using *BaseBasicBolt*, which as described
here
>>>>>>>>>>> <http://storm.apache.org/releases/1.0.6/Guaranteeing-message-processing.html>
>>>>>>>>>>> : Tuples emitted to BasicOutputCollector are
automatically
>>>>>>>>>>> anchored to the input tuple, and the input tuple
is acked for you
>>>>>>>>>>> automatically when the execute method completes.
>>>>>>>>>>>
>>>>>>>>>>> What you are saying is applicable for *BaseRichBolt.
*The Kafka
>>>>>>>>>>> spout I am using is from storm-kafka-client
>>>>>>>>>>> <https://mvnrepository.com/artifact/org.apache.storm/storm-kafka-client/1.2.1>
>>>>>>>>>>> library, so unique ID, etc should be already
taken care of.
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> Best Regards
>>>>>>>>>>>
>>>>>>>>>>> Saurabh Kumar Mimani
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> On Wed, Dec 5, 2018 at 12:52 PM Ravi Sharma <ping2ravi@gmail.com>
>>>>>>>>>>> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Hi Saurabh,
>>>>>>>>>>>> I think there is issue with part of code
which is emitting the
>>>>>>>>>>>> tuples.
>>>>>>>>>>>>
>>>>>>>>>>>> If you want to use ack mechanism, you need
to use anchor tuple
>>>>>>>>>>>> when you emit from bolts.
>>>>>>>>>>>>
>>>>>>>>>>>> Collector.emit(Tuple input, Values data)
>>>>>>>>>>>>
>>>>>>>>>>>> Also make sure Kafka spout emits tuple with
a unique id.
>>>>>>>>>>>>
>>>>>>>>>>>> Thanks
>>>>>>>>>>>> Ravi
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> On Wed, 5 Dec 2018, 06:35 saurabh mimani
<
>>>>>>>>>>>> mimani.saurabh@gmail.com wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> Hey,
>>>>>>>>>>>>>
>>>>>>>>>>>>> Thanks for your reply. What you are saying
is correct, However
>>>>>>>>>>>>> I am able to reproduce it more often
and I think it happens when multiple
>>>>>>>>>>>>> tuples gets failed in first run but all
of those gets success on retry,
>>>>>>>>>>>>> something of that sort is happening.
>>>>>>>>>>>>>
>>>>>>>>>>>>> This can be reproduced using following
two bolts and
>>>>>>>>>>>>> kafkaSpout easily, by running in cluster
more with 3/4 minutes:
>>>>>>>>>>>>>
>>>>>>>>>>>>> *BoltA*
>>>>>>>>>>>>>
>>>>>>>>>>>>> case class Abc(index: Int, rand: Boolean)
>>>>>>>>>>>>>
>>>>>>>>>>>>> class BoltA  extends BaseBasicBolt {
>>>>>>>>>>>>>
>>>>>>>>>>>>>   override def execute(input: Tuple,
collector: BasicOutputCollector): Unit = {
>>>>>>>>>>>>>     val inp = input.getBinaryByField("value").getObj[someObj]
>>>>>>>>>>>>>     val randomGenerator = new Random()
>>>>>>>>>>>>>
>>>>>>>>>>>>>     var i = 0
>>>>>>>>>>>>>     val rand = randomGenerator.nextBoolean()
>>>>>>>>>>>>>     1 to 100 foreach {
>>>>>>>>>>>>>       collector.emit(new Values(Abc(i,
rand).getJsonBytes))
>>>>>>>>>>>>>       i += 1
>>>>>>>>>>>>>     }
>>>>>>>>>>>>>   }
>>>>>>>>>>>>>
>>>>>>>>>>>>>   override def declareOutputFields(declarer:
OutputFieldsDeclarer): Unit = {
>>>>>>>>>>>>>     declarer.declare(new Fields("boltAout"))
>>>>>>>>>>>>>   }
>>>>>>>>>>>>>
>>>>>>>>>>>>> }
>>>>>>>>>>>>>
>>>>>>>>>>>>> *BoltB*
>>>>>>>>>>>>>
>>>>>>>>>>>>> class BoltB  extends BaseBasicBolt {
>>>>>>>>>>>>>
>>>>>>>>>>>>>   override def execute(input: Tuple,
collector: BasicOutputCollector): Unit = {
>>>>>>>>>>>>>     val abc = input.getBinaryByField("boltAout").getObj[Abc]
>>>>>>>>>>>>>     println(s"Received ${abc.index}th
tuple in BoltB")
>>>>>>>>>>>>>     if(abc.index >= 97 &&
abc.rand){
>>>>>>>>>>>>>       println(s"throwing FailedException
for ${abc.index}th tuple for")
>>>>>>>>>>>>>       throw new FailedException()
>>>>>>>>>>>>>     }
>>>>>>>>>>>>>   }
>>>>>>>>>>>>>
>>>>>>>>>>>>>   override def declareOutputFields(declarer:
OutputFieldsDeclarer): Unit = {
>>>>>>>>>>>>>   }
>>>>>>>>>>>>> }
>>>>>>>>>>>>>
>>>>>>>>>>>>> *KafkaSpout:*
>>>>>>>>>>>>>
>>>>>>>>>>>>> private def getKafkaSpoutConfig(source:
Config) = KafkaSpoutConfig.builder("connections.kafka.producerConnProps.metadata.broker.list",
"queueName")
>>>>>>>>>>>>>     .setProp(ConsumerConfig.GROUP_ID_CONFIG,
"grp")
>>>>>>>>>>>>>     .setProp(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.ByteArrayDeserializer")
>>>>>>>>>>>>>     .setOffsetCommitPeriodMs(100)
>>>>>>>>>>>>>     .setRetry(new KafkaSpoutRetryExponentialBackoff(
>>>>>>>>>>>>>       KafkaSpoutRetryExponentialBackoff.TimeInterval.milliSeconds(100),
>>>>>>>>>>>>>       KafkaSpoutRetryExponentialBackoff.TimeInterval.milliSeconds(100),
>>>>>>>>>>>>>       10,
>>>>>>>>>>>>>       KafkaSpoutRetryExponentialBackoff.TimeInterval.milliSeconds(3000)
>>>>>>>>>>>>>     ))
>>>>>>>>>>>>>     .setFirstPollOffsetStrategy(offsetStrategyMapping(ConnektConfig.getOrElse("connections.kafka.consumerConnProps.offset.strategy",
"UNCOMMITTED_EARLIEST")))
>>>>>>>>>>>>>     .setMaxUncommittedOffsets(ConnektConfig.getOrElse("connections.kafka.consumerConnProps.max.uncommited.offset",
10000))
>>>>>>>>>>>>>     .build()
>>>>>>>>>>>>>
>>>>>>>>>>>>> Other config:
>>>>>>>>>>>>>
>>>>>>>>>>>>> messageTimeoutInSecons: 300
>>>>>>>>>>>>>
>>>>>>>>>>>>> [image: Screenshot 2018-12-05 at 12.03.08
PM.png]
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> Best Regards
>>>>>>>>>>>>>
>>>>>>>>>>>>> Saurabh Kumar Mimani
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> On Mon, Dec 3, 2018 at 9:18 PM Stig Rohde
Døssing <
>>>>>>>>>>>>> stigdoessing@gmail.com> wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>>> Hi Saurabh,
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> The tuple emitted by the spout will
only be acked once all
>>>>>>>>>>>>>> branches of the tuple tree have been
acked, i.e. all 100 tuples are acked.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> The error you're seeing was added
as part of
>>>>>>>>>>>>>> https://issues.apache.org/jira/browse/STORM-2666
to try to
>>>>>>>>>>>>>> avoid having that bug pop up again.
Could you try posting your spout
>>>>>>>>>>>>>> configuration? Also if possible,
it would be helpful if you could enable
>>>>>>>>>>>>>> debug logging for org.apache.storm.kafka.spout.KafkaSpout
>>>>>>>>>>>>>> and maybe also org.apache.storm.kafka.spout.internal.OffsetManager.
>>>>>>>>>>>>>> They log when offsets are committed
(e.g.
>>>>>>>>>>>>>> https://github.com/apache/storm/blob/v1.2.1/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java#L546),
>>>>>>>>>>>>>> and also when the consumer position
is changed (e.g.
>>>>>>>>>>>>>> https://github.com/apache/storm/blob/v1.2.1/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java#L561
>>>>>>>>>>>>>> ). It should be possible to track
down when/why the consumer position wound
>>>>>>>>>>>>>> up behind the committed offset.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Just so you're aware, the check that
crashes the spout has
>>>>>>>>>>>>>> been removed as of
>>>>>>>>>>>>>> https://issues.apache.org/jira/browse/STORM-3102.
I'd still
>>>>>>>>>>>>>> like to know if there's a bug in
the spout causing it to emit tuples that
>>>>>>>>>>>>>> were already committed though.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Den man. 3. dec. 2018 kl. 11.29 skrev
saurabh mimani <
>>>>>>>>>>>>>> mimani.saurabh@gmail.com>:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Version Info:
>>>>>>>>>>>>>>>    "org.apache.storm" % "storm-core"
% "1.2.1"
>>>>>>>>>>>>>>>    "org.apache.storm" % "storm-kafka-client"
% "1.2.1"
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> I have a storm topology which
looks like following:
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> boltA -> boltB -> boltC
-> boltD
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> boltA just does some formatting
of requests and emits
>>>>>>>>>>>>>>> another tuple. boltB does some
processing and emits around
>>>>>>>>>>>>>>> 100 tuples for each tuple being
received. boltC and boltD processes
>>>>>>>>>>>>>>> these tuples. All the bolts implements
BaseBasicBolt.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> What I am noticing is whenever
boltD marks some tuple as
>>>>>>>>>>>>>>> fail and marks that for retry
by throwing FailedException,
>>>>>>>>>>>>>>> After a few minutes less than
my topology timeout, I get the following
>>>>>>>>>>>>>>> error:
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> 2018-11-30T20:01:05.261+05:30
util [ERROR] Async loop died!
>>>>>>>>>>>>>>> java.lang.IllegalStateException:
Attempting to emit a message that has already been committed. This should never occur when
using the at-least-once processing guarantee.
>>>>>>>>>>>>>>>         at org.apache.storm.kafka.spout.KafkaSpout.emitOrRetryTuple(KafkaSpout.java:471)
~[stormjar.jar:?]
>>>>>>>>>>>>>>>         at org.apache.storm.kafka.spout.KafkaSpout.emitIfWaitingNotEmitted(KafkaSpout.java:440)
~[stormjar.jar:?]
>>>>>>>>>>>>>>>         at org.apache.storm.kafka.spout.KafkaSpout.nextTuple(KafkaSpout.java:308)
~[stormjar.jar:?]
>>>>>>>>>>>>>>>         at org.apache.storm.daemon.executor$fn__4975$fn__4990$fn__5021.invoke(executor.clj:654)
~[storm-core-1.2.1.jar:1.2.1]
>>>>>>>>>>>>>>>         at org.apache.storm.util$async_loop$fn__557.invoke(util.clj:484)
[storm-core-1.2.1.jar:1.2.1]
>>>>>>>>>>>>>>>         at clojure.lang.AFn.run(AFn.java:22)
[clojure-1.7.0.jar:?]
>>>>>>>>>>>>>>>         at java.lang.Thread.run(Thread.java:745)
[?:1.8.0_60]
>>>>>>>>>>>>>>> 2018-11-30T20:01:05.262+05:30
executor [ERROR]
>>>>>>>>>>>>>>> java.lang.IllegalStateException:
Attempting to emit a message that has already been committed. This should never occur when
using the at-least-once processing guarantee.
>>>>>>>>>>>>>>>         at org.apache.storm.kafka.spout.KafkaSpout.emitOrRetryTuple(KafkaSpout.java:471)
~[stormjar.jar:?]
>>>>>>>>>>>>>>>         at org.apache.storm.kafka.spout.KafkaSpout.emitIfWaitingNotEmitted(KafkaSpout.java:440)
~[stormjar.jar:?]
>>>>>>>>>>>>>>>         at org.apache.storm.kafka.spout.KafkaSpout.nextTuple(KafkaSpout.java:308)
~[stormjar.jar:?]
>>>>>>>>>>>>>>>         at org.apache.storm.daemon.executor$fn__4975$fn__4990$fn__5021.invoke(executor.clj:654)
~[storm-core-1.2.1.jar:1.2.1]
>>>>>>>>>>>>>>>         at org.apache.storm.util$async_loop$fn__557.invoke(util.clj:484)
[storm-core-1.2.1.jar:1.2.1]
>>>>>>>>>>>>>>>         at clojure.lang.AFn.run(AFn.java:22)
[clojure-1.7.0.jar:?]
>>>>>>>>>>>>>>>         at java.lang.Thread.run(Thread.java:745)
[?:1.8.0_60]
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> What seems to be happening is
this happens when boltB emits
>>>>>>>>>>>>>>> 100 out of 1 tuple and boltDfails
one of the tuples out of
>>>>>>>>>>>>>>> those 100 tuples, I am getting
this error. Not able to understand how to
>>>>>>>>>>>>>>> fix this, ideally it should ack
an original tuple when all
>>>>>>>>>>>>>>> 100 tuples are acked, but probably
an original tuple is acked before all
>>>>>>>>>>>>>>> those 100 tuples are acked, which
causes this error.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Best Regards
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Saurabh Kumar Mimani
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>
>>>>>>>>> --
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> *Peter Chamberlain* | Senior Software Engineer | HTK
>>>>>>>>>
>>>>>>>>> T: +44(0)870 600 2311
>>>>>>>>> Connect with me: Email <peter.chamberlain@htk.co.uk>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> [image: htk logo] <http://www.htk.co.uk/>
>>>>>>>>>
>>>>>>>>> Connect with HTK: htk.co.uk <http://www.htk.co.uk/>
| LinkedIn
>>>>>>>>> <http://www.linkedin.com/company/htk/> | Twitter
>>>>>>>>> <http://www.twitter.com/htkhorizon>
>>>>>>>>>
>>>>>>>>> HTK Limited, Chapmans Warehouse, Wherry Quay, Ipswich,
IP4 1AS, UK.
>>>>>>>>> Company Registered in England and Wales as 3191677, VAT
Number 675
>>>>>>>>> 9467 71
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> PLEASE CONSIDER THE ENVIRONMENT BEFORE PRINTING THIS
EMAIL.
>>>>>>>>> This email is only for the use of the intended recipients
and may
>>>>>>>>> contain privileged information. If you’ve received
this email in error,
>>>>>>>>> please let the sender know; then delete the message.
The views
>>>>>>>>> expressed in this email represent those of the sender
and not necessarily
>>>>>>>>> of HTK.
>>>>>>>>>
>>>>>>>>

Mime
View raw message