From user-return-14264-archive-asf-public=cust-asf.ponee.io@storm.apache.org Sun Dec 9 20:43:01 2018 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx-eu-01.ponee.io (Postfix) with SMTP id 324BD180648 for ; Sun, 9 Dec 2018 20:43:00 +0100 (CET) Received: (qmail 17222 invoked by uid 500); 9 Dec 2018 19:42:54 -0000 Mailing-List: contact user-help@storm.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: user@storm.apache.org Delivered-To: mailing list user@storm.apache.org Received: (qmail 17212 invoked by uid 99); 9 Dec 2018 19:42:54 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd4-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Sun, 09 Dec 2018 19:42:54 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd4-us-west.apache.org (ASF Mail Server at spamd4-us-west.apache.org) with ESMTP id DB460C00E8 for ; Sun, 9 Dec 2018 19:42:53 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd4-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 3.029 X-Spam-Level: *** X-Spam-Status: No, score=3.029 tagged_above=-999 required=6.31 tests=[DKIMWL_WL_MED=-0.001, DKIM_SIGNED=0.1, DKIM_VALID=-0.1, DKIM_VALID_AU=-0.1, DKIM_VALID_EF=-0.1, FREEMAIL_REPLY=1, HTML_FONT_LOW_CONTRAST=0.001, HTML_MESSAGE=2, KAM_LOTSOFHASH=0.25, RCVD_IN_DNSWL_NONE=-0.0001, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, SPF_PASS=-0.001] autolearn=disabled Authentication-Results: spamd4-us-west.apache.org (amavisd-new); dkim=pass (2048-bit key) header.d=gmail.com Received: from mx1-lw-us.apache.org ([10.40.0.8]) by localhost (spamd4-us-west.apache.org [10.40.0.11]) (amavisd-new, port 10024) with ESMTP id URJDphoegey9 for ; Sun, 9 Dec 2018 19:42:50 +0000 (UTC) Received: from mail-ot1-f50.google.com (mail-ot1-f50.google.com [209.85.210.50]) by mx1-lw-us.apache.org (ASF Mail Server at mx1-lw-us.apache.org) with ESMTPS id 7B94D61119 for ; Sun, 9 Dec 2018 19:42:50 +0000 (UTC) Received: by mail-ot1-f50.google.com with SMTP id 81so8496786otj.2 for ; Sun, 09 Dec 2018 11:42:50 -0800 (PST) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=gmail.com; s=20161025; h=mime-version:references:in-reply-to:from:date:message-id:subject:to; bh=CkG6LqmUKBnqMyc3ikjmmhnivQYIGdXFwWmzrQ0GaXk=; b=ax+ACiU/K1uAGvSMK+fC5EywzAsdxC6KtF6uDfQCBOrJ//kewEgjTlp1OBCL7kA3cJ ms1oRd1X0KWdVTgX9uHMWSGWA13r/+qti8xGxeA8H8clCVAnXvZkDbklld7FIWPjNePY rtd6+XOtwt0r4ldEuTGcAL9qbjDyasWQ5iJaMZfAX4pkXdkDXrcqf/inJfZC7UDclbSG vcRO2C/Uh/5BThRJcAZ2sATyRhN5NXm6k/b9SM1jHM19HiDZ/vxwFhDXtexfrT6tjIFI IAfZ1ha48LuDvmRYmCjJtyjzKGp+kryMNCKaAu1LocJaHqFIRxVsKrWQH90IB2yFMX+k voFA== X-Google-DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=1e100.net; s=20161025; h=x-gm-message-state:mime-version:references:in-reply-to:from:date :message-id:subject:to; bh=CkG6LqmUKBnqMyc3ikjmmhnivQYIGdXFwWmzrQ0GaXk=; b=cZZupmqQLZ5z6PdO7jqgLvKf4IdV7zhJxZYP/y5l/RctW6ttnk7RRA7+CjH/8hJ0Ft 16nFjOGDUSfqGn5Bh6GlDBnTW5Zox557sBa1IGfdha6nFJiP/RFd9cESCVh+X3xLW1qj GAzJcsB6smMS3nK3FHY2zusko7olur6ch5xZScEs1k/JZlR/n9bNTX9exBRmqNLRWPmp hSxzNKjLRAuLkMUzJ+0lwxl7mlEYQLOFbcUOcJF0JJ2BsHERnbQ+E5BCLj5wZKCc+K1n hlhBztoCE/huT3LG0BifJBcVHyB9apd+fIlT9jiPI24B/CVvrWYeuqkQxYqBPRttQ4E5 V80w== X-Gm-Message-State: AA+aEWbd2WaV9U38FVPWVoAimCtPD53ENr7/Y4cEW7IKnzvV7QzAme0D moD5FMHS0kMvDQgz19A4bYZIDQY5/+hFxOrbxUedM0wg X-Google-Smtp-Source: AFSGD/XjCqoNTroI8p0li/H9Bz7a1q6ImCVCYefifoRO38vqqED6/YHZke62KFtwqHef5dade8Pah4ze3QBTvgE+c3k= X-Received: by 2002:a9d:32c7:: with SMTP id u65mr6592421otb.236.1544384563681; Sun, 09 Dec 2018 11:42:43 -0800 (PST) MIME-Version: 1.0 References: In-Reply-To: From: =?UTF-8?Q?Stig_Rohde_D=C3=B8ssing?= Date: Sun, 9 Dec 2018 20:42:34 +0100 Message-ID: Subject: Re: Storm using Kafka Spout gives error: IllegalStateException To: user@storm.apache.org Content-Type: multipart/alternative; boundary="000000000000509caf057c9c0d0a" --000000000000509caf057c9c0d0a Content-Type: text/plain; charset="UTF-8" Content-Transfer-Encoding: quoted-printable I'm assuming you applied the fix on top of 1.2.1 or something like that? The exception can't be thrown from the branch I linked, since it was removed in an earlier commit in 1.x-branch. Your logs show that the committed offset for partition 6 is 1682098 (98 for short). 98 was emitted, since it shows up in the emitted list. I'm guessing it failed and was replayed. 99 and up are in the acked list, so they are ready to commit as soon as 98 finishes processing. The log shows that 99 is the tuple encountering the exception, so I'm guessing what happened is that 98 was acked and the spout decided to commit 98, 99 etc. For some reason it then still decides to emit 99. The only reasons I can think of (barring bugs in Kafka/the Kafka client) for that to happen would be that 99 is in waitingToEmit and isn't being cleared out during the commit (this is the bug I tried to fix), somehow 99 is still queued for retry (this should not be possible) or for some reason the consumer position ends up below the committed offset. I think the best bet for tracking down why it happens would be logging the contents of the RetryService, the contents of waitingToEmit and the consumer position both after commitOffsetsForAckedTuples, and right before the exception is thrown. Could you try logging those? I can add the log statements on top of the bugfix if needed. Den s=C3=B8n. 9. dec. 2018 kl. 18.42 skrev saurabh mimani < mimani.saurabh@gmail.com>: > Hey, I see this is still happening, this time, it seems, as it seemed to > me, because same offset from different partition was committed(guessing > from logs), but not sure as that should be handled. > > Please find the logs here > . > > > > Best Regards > > Saurabh Kumar Mimani > > > > > On Sun, Dec 9, 2018 at 3:19 AM Stig Rohde D=C3=B8ssing > wrote: > >> I believe I have a fix, your logs were helpful. Please try out the >> changes in https://github.com/apache/storm/pull/2923/files. >> >> Den l=C3=B8r. 8. dec. 2018 kl. 07.25 skrev saurabh mimani < >> mimani.saurabh@gmail.com>: >> >>> Hey, >>> >>> Thanks for looking into this. I was not able to produce this earlier on >>> my local, however I will again try once. I was consistently able to >>> reproduce it with parallelism of 5 for boltA and parallelism of 200 wit= h >>> boltB on 2 machines in cluster mode. >>> >>> I will try again with your code once. >>> >>> These = are >>> logs of Kafka Spout, when I was able to reproduce it in cluster mode wi= th >>> my topology, in case these helps. >>> >>> >>> >>> Best Regards >>> >>> Saurabh Kumar Mimani >>> >>> >>> >>> >>> On Wed, Dec 5, 2018 at 11:33 PM Stig Rohde D=C3=B8ssing < >>> stigdoessing@gmail.com> wrote: >>> >>>> I can't reproduce this. >>>> >>>> I created a test topology similar to the code you posted, based on the >>>> 1.2.1 release tag >>>> https://github.com/srdo/storm/commit/f5577f7a773f3d433b90a2670de5329b3= 96f5564 >>>> . >>>> >>>> I set up a local Kafka instance and put enough messages to run the >>>> topology for 15 minutes or so in the test topic. After populating the >>>> topic, I started the topology and let it run until it reached the end = of >>>> the topic. As expected a lot of messages failed, but after a while it >>>> managed to successfully process all messages. I didn't see any worker >>>> crashes, and the logs only show some errors related to moving files >>>> (expected on Windows). >>>> >>>> The topology seems to work fine against both Kafka 0.10.2.2 and 1.1.1, >>>> though 1.1.1 is slower due to >>>> https://issues.apache.org/jira/browse/STORM-3102. >>>> >>>> The Kafka setup was with the default configuration for both Kafka and >>>> Zookeeper, and Storm was set up with a local Nimbus, single local >>>> Supervisor and 4 worker slots. >>>> >>>> Saurabh please try to reproduce the issue using the topology I linked. >>>> If you need to make adjustments in order to provoke the issue, please >>>> update the code and link it so I can check it out and try it. >>>> >>>> Den ons. 5. dec. 2018 kl. 16.42 skrev saurabh mimani < >>>> mimani.saurabh@gmail.com>: >>>> >>>>> No, I have checked that, there is no other consumer group consuming >>>>> from the same. >>>>> >>>>> Thanks for looking into it, let me know if you need any >>>>> other information. >>>>> >>>>> >>>>> >>>>> Best Regards >>>>> >>>>> Saurabh Kumar Mimani >>>>> >>>>> >>>>> >>>>> >>>>> On Wed, Dec 5, 2018 at 9:02 PM Stig Rohde D=C3=B8ssing < >>>>> stigdoessing@gmail.com> wrote: >>>>> >>>>>> Ravi, BaseBasicBolt does automatically anchor any emitted tuples to >>>>>> the input tuple. It's intended for bolts that just need to receive a= tuple, >>>>>> synchronously process it and emit some new tuples anchored to the in= put >>>>>> tuple. It's there because doing manual acking is tedious and error-p= rone in >>>>>> cases where you don't need to be able to e.g. emit new unachored tup= les or >>>>>> ack the input tuple asynchronously. As Peter mentioned, the >>>>>> BasicBoltExecutor ( >>>>>> https://github.com/apache/storm/blob/21bb1388414d373572779289edc785c= 7e5aa52aa/storm-client/src/jvm/org/apache/storm/topology/BasicBoltExecutor.= java#L42) >>>>>> handles acking for you. >>>>>> >>>>>> Saurabh, I'll see if I can reproduce your issue. Please also check >>>>>> that you don't have any other consumers using the same consumer grou= p as >>>>>> the spout. >>>>>> >>>>>> Den ons. 5. dec. 2018 kl. 11.53 skrev Peter Chamberlain < >>>>>> peter.chamberlain@htk.co.uk>: >>>>>> >>>>>>> Pretty sure that the ack path is handled by BasicBoltExecutor (an >>>>>>> implmentation of IRichBolt), which calls collector.setContext(input= ), and >>>>>>> also does the acking inside it's execute function, and in between c= alls the >>>>>>> BaseBasicBolt.execute version (which takes the collector as well as= the >>>>>>> tuple as parameters). >>>>>>> So the intention is clearly that it is automatically anchored and >>>>>>> acknowledged. >>>>>>> >>>>>>> On Wed, 5 Dec 2018 at 09:57, Ravi Sharma >>>>>>> wrote: >>>>>>> >>>>>>>> Hi Saurabh, >>>>>>>> I checked the BaseBasicBolt which comes with storm, it doesn't do >>>>>>>> much. >>>>>>>> Also checked BasicOutputCollector and don't see how it will anchor >>>>>>>> automatically unless you call BasicOutputCollector.setContext(= Tuple >>>>>>>> tuple), don't see all of your code, but don't see this call in you= r boltA >>>>>>>> code. >>>>>>>> Also it looks like even when you make this setContext call, after >>>>>>>> that you will have to emit using following emit function >>>>>>>> >>>>>>>> BasicOutputCollector.emit(String streamId, List tuples) >>>>>>>> >>>>>>>> Basically just check that whatever emit function is called, it doe= s >>>>>>>> pass the input tuple. >>>>>>>> >>>>>>>> >>>>>>>> Once I had exactly same issue for few days, but mine was related t= o >>>>>>>> config. I wanted to read from two Kafka topics in one topology and= had two >>>>>>>> different kafkaspout created, mistakenly I copy pasted same config= and that >>>>>>>> caused this issue. Not sure if that applies to your scenario. >>>>>>>> >>>>>>>> >>>>>>>> *NOTE*: I checked the latest storm master branch for code. >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> On Wed, 5 Dec 2018, 08:11 saurabh mimani >>>>>>> wrote: >>>>>>>> >>>>>>>>> Hey Ravi, >>>>>>>>> >>>>>>>>> I am using *BaseBasicBolt*, which as described here >>>>>>>>> >>>>>>>>> : Tuples emitted to BasicOutputCollector are automatically >>>>>>>>> anchored to the input tuple, and the input tuple is acked for you >>>>>>>>> automatically when the execute method completes. >>>>>>>>> >>>>>>>>> What you are saying is applicable for *BaseRichBolt. *The Kafka >>>>>>>>> spout I am using is from storm-kafka-client >>>>>>>>> >>>>>>>>> library, so unique ID, etc should be already taken care of. >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> Best Regards >>>>>>>>> >>>>>>>>> Saurabh Kumar Mimani >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> On Wed, Dec 5, 2018 at 12:52 PM Ravi Sharma >>>>>>>>> wrote: >>>>>>>>> >>>>>>>>>> Hi Saurabh, >>>>>>>>>> I think there is issue with part of code which is emitting the >>>>>>>>>> tuples. >>>>>>>>>> >>>>>>>>>> If you want to use ack mechanism, you need to use anchor tuple >>>>>>>>>> when you emit from bolts. >>>>>>>>>> >>>>>>>>>> Collector.emit(Tuple input, Values data) >>>>>>>>>> >>>>>>>>>> Also make sure Kafka spout emits tuple with a unique id. >>>>>>>>>> >>>>>>>>>> Thanks >>>>>>>>>> Ravi >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> On Wed, 5 Dec 2018, 06:35 saurabh mimani < >>>>>>>>>> mimani.saurabh@gmail.com wrote: >>>>>>>>>> >>>>>>>>>>> Hey, >>>>>>>>>>> >>>>>>>>>>> Thanks for your reply. What you are saying is correct, However = I >>>>>>>>>>> am able to reproduce it more often and I think it happens when = multiple >>>>>>>>>>> tuples gets failed in first run but all of those gets success o= n retry, >>>>>>>>>>> something of that sort is happening. >>>>>>>>>>> >>>>>>>>>>> This can be reproduced using following two bolts and kafkaSpout >>>>>>>>>>> easily, by running in cluster more with 3/4 minutes: >>>>>>>>>>> >>>>>>>>>>> *BoltA* >>>>>>>>>>> >>>>>>>>>>> case class Abc(index: Int, rand: Boolean) >>>>>>>>>>> >>>>>>>>>>> class BoltA extends BaseBasicBolt { >>>>>>>>>>> >>>>>>>>>>> override def execute(input: Tuple, collector: BasicOutputColl= ector): Unit =3D { >>>>>>>>>>> val inp =3D input.getBinaryByField("value").getObj[someObj] >>>>>>>>>>> val randomGenerator =3D new Random() >>>>>>>>>>> >>>>>>>>>>> var i =3D 0 >>>>>>>>>>> val rand =3D randomGenerator.nextBoolean() >>>>>>>>>>> 1 to 100 foreach { >>>>>>>>>>> collector.emit(new Values(Abc(i, rand).getJsonBytes)) >>>>>>>>>>> i +=3D 1 >>>>>>>>>>> } >>>>>>>>>>> } >>>>>>>>>>> >>>>>>>>>>> override def declareOutputFields(declarer: OutputFieldsDeclar= er): Unit =3D { >>>>>>>>>>> declarer.declare(new Fields("boltAout")) >>>>>>>>>>> } >>>>>>>>>>> >>>>>>>>>>> } >>>>>>>>>>> >>>>>>>>>>> *BoltB* >>>>>>>>>>> >>>>>>>>>>> class BoltB extends BaseBasicBolt { >>>>>>>>>>> >>>>>>>>>>> override def execute(input: Tuple, collector: BasicOutputColl= ector): Unit =3D { >>>>>>>>>>> val abc =3D input.getBinaryByField("boltAout").getObj[Abc] >>>>>>>>>>> println(s"Received ${abc.index}th tuple in BoltB") >>>>>>>>>>> if(abc.index >=3D 97 && abc.rand){ >>>>>>>>>>> println(s"throwing FailedException for ${abc.index}th tup= le for") >>>>>>>>>>> throw new FailedException() >>>>>>>>>>> } >>>>>>>>>>> } >>>>>>>>>>> >>>>>>>>>>> override def declareOutputFields(declarer: OutputFieldsDeclar= er): Unit =3D { >>>>>>>>>>> } >>>>>>>>>>> } >>>>>>>>>>> >>>>>>>>>>> *KafkaSpout:* >>>>>>>>>>> >>>>>>>>>>> private def getKafkaSpoutConfig(source: Config) =3D KafkaSpoutC= onfig.builder("connections.kafka.producerConnProps.metadata.broker.list", "= queueName") >>>>>>>>>>> .setProp(ConsumerConfig.GROUP_ID_CONFIG, "grp") >>>>>>>>>>> .setProp(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "o= rg.apache.kafka.common.serialization.ByteArrayDeserializer") >>>>>>>>>>> .setOffsetCommitPeriodMs(100) >>>>>>>>>>> .setRetry(new KafkaSpoutRetryExponentialBackoff( >>>>>>>>>>> KafkaSpoutRetryExponentialBackoff.TimeInterval.milliSecon= ds(100), >>>>>>>>>>> KafkaSpoutRetryExponentialBackoff.TimeInterval.milliSecon= ds(100), >>>>>>>>>>> 10, >>>>>>>>>>> KafkaSpoutRetryExponentialBackoff.TimeInterval.milliSecon= ds(3000) >>>>>>>>>>> )) >>>>>>>>>>> .setFirstPollOffsetStrategy(offsetStrategyMapping(ConnektCo= nfig.getOrElse("connections.kafka.consumerConnProps.offset.strategy", "UNCO= MMITTED_EARLIEST"))) >>>>>>>>>>> .setMaxUncommittedOffsets(ConnektConfig.getOrElse("connecti= ons.kafka.consumerConnProps.max.uncommited.offset", 10000)) >>>>>>>>>>> .build() >>>>>>>>>>> >>>>>>>>>>> Other config: >>>>>>>>>>> >>>>>>>>>>> messageTimeoutInSecons: 300 >>>>>>>>>>> >>>>>>>>>>> [image: Screenshot 2018-12-05 at 12.03.08 PM.png] >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> Best Regards >>>>>>>>>>> >>>>>>>>>>> Saurabh Kumar Mimani >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> On Mon, Dec 3, 2018 at 9:18 PM Stig Rohde D=C3=B8ssing < >>>>>>>>>>> stigdoessing@gmail.com> wrote: >>>>>>>>>>> >>>>>>>>>>>> Hi Saurabh, >>>>>>>>>>>> >>>>>>>>>>>> The tuple emitted by the spout will only be acked once all >>>>>>>>>>>> branches of the tuple tree have been acked, i.e. all 100 tuple= s are acked. >>>>>>>>>>>> >>>>>>>>>>>> The error you're seeing was added as part of >>>>>>>>>>>> https://issues.apache.org/jira/browse/STORM-2666 to try to >>>>>>>>>>>> avoid having that bug pop up again. Could you try posting your= spout >>>>>>>>>>>> configuration? Also if possible, it would be helpful if you co= uld enable >>>>>>>>>>>> debug logging for org.apache.storm.kafka.spout.KafkaSpout and >>>>>>>>>>>> maybe also org.apache.storm.kafka.spout.internal.OffsetManager= . >>>>>>>>>>>> They log when offsets are committed (e.g. >>>>>>>>>>>> https://github.com/apache/storm/blob/v1.2.1/external/storm-kaf= ka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java#L546), >>>>>>>>>>>> and also when the consumer position is changed (e.g. >>>>>>>>>>>> https://github.com/apache/storm/blob/v1.2.1/external/storm-kaf= ka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java#L561 >>>>>>>>>>>> ). It should be possible to track down when/why the consumer p= osition wound >>>>>>>>>>>> up behind the committed offset. >>>>>>>>>>>> >>>>>>>>>>>> Just so you're aware, the check that crashes the spout has bee= n >>>>>>>>>>>> removed as of https://issues.apache.org/jira/browse/STORM-3102= . >>>>>>>>>>>> I'd still like to know if there's a bug in the spout causing i= t to emit >>>>>>>>>>>> tuples that were already committed though. >>>>>>>>>>>> >>>>>>>>>>>> Den man. 3. dec. 2018 kl. 11.29 skrev saurabh mimani < >>>>>>>>>>>> mimani.saurabh@gmail.com>: >>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> Version Info: >>>>>>>>>>>>> "org.apache.storm" % "storm-core" % "1.2.1" >>>>>>>>>>>>> "org.apache.storm" % "storm-kafka-client" % "1.2.1" >>>>>>>>>>>>> >>>>>>>>>>>>> I have a storm topology which looks like following: >>>>>>>>>>>>> >>>>>>>>>>>>> boltA -> boltB -> boltC -> boltD >>>>>>>>>>>>> >>>>>>>>>>>>> boltA just does some formatting of requests and emits another >>>>>>>>>>>>> tuple. boltB does some processing and emits around 100 tuples >>>>>>>>>>>>> for each tuple being received. boltC and boltD processes >>>>>>>>>>>>> these tuples. All the bolts implements BaseBasicBolt. >>>>>>>>>>>>> >>>>>>>>>>>>> What I am noticing is whenever boltD marks some tuple as fail >>>>>>>>>>>>> and marks that for retry by throwing FailedException, After a >>>>>>>>>>>>> few minutes less than my topology timeout, I get the followin= g error: >>>>>>>>>>>>> >>>>>>>>>>>>> 2018-11-30T20:01:05.261+05:30 util [ERROR] Async loop died! >>>>>>>>>>>>> java.lang.IllegalStateException: Attempting to emit a message= that has already been committed. This should never occur when using the at= -least-once processing guarantee. >>>>>>>>>>>>> at org.apache.storm.kafka.spout.KafkaSpout.emitOrRetr= yTuple(KafkaSpout.java:471) ~[stormjar.jar:?] >>>>>>>>>>>>> at org.apache.storm.kafka.spout.KafkaSpout.emitIfWait= ingNotEmitted(KafkaSpout.java:440) ~[stormjar.jar:?] >>>>>>>>>>>>> at org.apache.storm.kafka.spout.KafkaSpout.nextTuple(= KafkaSpout.java:308) ~[stormjar.jar:?] >>>>>>>>>>>>> at org.apache.storm.daemon.executor$fn__4975$fn__4990= $fn__5021.invoke(executor.clj:654) ~[storm-core-1.2.1.jar:1.2.1] >>>>>>>>>>>>> at org.apache.storm.util$async_loop$fn__557.invoke(ut= il.clj:484) [storm-core-1.2.1.jar:1.2.1] >>>>>>>>>>>>> at clojure.lang.AFn.run(AFn.java:22) [clojure-1.7.0.j= ar:?] >>>>>>>>>>>>> at java.lang.Thread.run(Thread.java:745) [?:1.8.0_60] >>>>>>>>>>>>> 2018-11-30T20:01:05.262+05:30 executor [ERROR] >>>>>>>>>>>>> java.lang.IllegalStateException: Attempting to emit a message= that has already been committed. This should never occur when using the at= -least-once processing guarantee. >>>>>>>>>>>>> at org.apache.storm.kafka.spout.KafkaSpout.emitOrRetr= yTuple(KafkaSpout.java:471) ~[stormjar.jar:?] >>>>>>>>>>>>> at org.apache.storm.kafka.spout.KafkaSpout.emitIfWait= ingNotEmitted(KafkaSpout.java:440) ~[stormjar.jar:?] >>>>>>>>>>>>> at org.apache.storm.kafka.spout.KafkaSpout.nextTuple(= KafkaSpout.java:308) ~[stormjar.jar:?] >>>>>>>>>>>>> at org.apache.storm.daemon.executor$fn__4975$fn__4990= $fn__5021.invoke(executor.clj:654) ~[storm-core-1.2.1.jar:1.2.1] >>>>>>>>>>>>> at org.apache.storm.util$async_loop$fn__557.invoke(ut= il.clj:484) [storm-core-1.2.1.jar:1.2.1] >>>>>>>>>>>>> at clojure.lang.AFn.run(AFn.java:22) [clojure-1.7.0.j= ar:?] >>>>>>>>>>>>> at java.lang.Thread.run(Thread.java:745) [?:1.8.0_60] >>>>>>>>>>>>> >>>>>>>>>>>>> What seems to be happening is this happens when boltB emits >>>>>>>>>>>>> 100 out of 1 tuple and boltDfails one of the tuples out of >>>>>>>>>>>>> those 100 tuples, I am getting this error. Not able to unders= tand how to >>>>>>>>>>>>> fix this, ideally it should ack an original tuple when all >>>>>>>>>>>>> 100 tuples are acked, but probably an original tuple is acked= before all >>>>>>>>>>>>> those 100 tuples are acked, which causes this error. >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> Best Regards >>>>>>>>>>>>> >>>>>>>>>>>>> Saurabh Kumar Mimani >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>> >>>>>>> -- >>>>>>> >>>>>>> >>>>>>> >>>>>>> *Peter Chamberlain* | Senior Software Engineer | HTK >>>>>>> >>>>>>> T: +44(0)870 600 2311 >>>>>>> Connect with me: Email >>>>>>> >>>>>>> >>>>>>> [image: htk logo] >>>>>>> >>>>>>> Connect with HTK: htk.co.uk | LinkedIn >>>>>>> | Twitter >>>>>>> >>>>>>> >>>>>>> HTK Limited, Chapmans Warehouse, Wherry Quay, Ipswich, IP4 1AS, UK. >>>>>>> Company Registered in England and Wales as 3191677, VAT Number 675 >>>>>>> 9467 71 >>>>>>> >>>>>>> >>>>>>> PLEASE CONSIDER THE ENVIRONMENT BEFORE PRINTING THIS EMAIL. >>>>>>> This email is only for the use of the intended recipients and may >>>>>>> contain privileged information. If you=E2=80=99ve received this ema= il in error, >>>>>>> please let the sender know; then delete the message. The views >>>>>>> expressed in this email represent those of the sender and not neces= sarily >>>>>>> of HTK. >>>>>>> >>>>>> --000000000000509caf057c9c0d0a Content-Type: text/html; charset="UTF-8" Content-Transfer-Encoding: quoted-printable
I'm assuming you applied the fix= on top of 1.2.1 or something like that? The exception can't be thrown = from the branch I linked, since it was removed in an earlier commit in 1.x-= branch.

Your logs show that the committed offset f= or partition 6 is 1682098 (98 for short). 98 was emitted, since it shows up= in the emitted list. I'm guessing it failed and was replayed. 99 and u= p are in the acked list, so they are ready to commit as soon as 98 finishes= processing.

The log shows that 99 is the tuple en= countering the exception, so I'm guessing what happened is that 98 was = acked and the spout decided to commit 98, 99 etc. For some reason it then s= till decides to emit 99. The only reasons I can think of (barring bugs in K= afka/the Kafka client) for that to happen would be that 99 is in waitingToE= mit and isn't being cleared out during the commit (this is the bug I tr= ied to fix), somehow 99 is still queued for retry (this should not be possi= ble) or for some reason the consumer position ends up below the committed o= ffset. I think the best bet for tracking down why it happens would be loggi= ng the contents of the RetryService, the contents of waitingToEmit and the = consumer position both after commitOffsetsForAckedTuples, and right before = the exception is thrown.

Could you try loggin= g those? I can add the log statements on top of the bugfix if needed.

Den s=C3=B8= n. 9. dec. 2018 kl. 18.42 skrev saurabh mimani <mimani.saurabh@gmail.com>:
Hey, I see this is s= till happening, this time, it seems, as it seemed to me, because same offse= t from different partition was committed(guessing from logs), but not sure = as that should be handled.

Please find the logs here.
=C2=A0


Best Regards

Sa= urabh Kumar Mimani




On Sun, Dec 9, 2018 at 3:19 AM Stig Rohde D=C3=B8ssing <stigdoessing@gmail.com&= gt; wrote:
I believe I have a fix, your logs were helpful= . Please try out the changes in https://github.com/apache/storm/pull/292= 3/files.


On Wed, Dec 5, 2018 at 11:33 PM Stig Rohde D=C3= =B8ssing <st= igdoessing@gmail.com> wrote:
I can't reproduce this.

I created a test topo= logy similar to the code you posted, based on the 1.2.1 release tag https://github.com/srdo/storm/commit/f5577f7a773f= 3d433b90a2670de5329b396f5564.

I set up a local= Kafka instance and put enough messages to run the topology for 15 minutes = or so in the test topic. After populating the topic, I started the topology= and let it run until it reached the end of the topic. As expected a lot of= messages failed, but after a while it managed to successfully process all = messages. I didn't see any worker crashes, and the logs only show some = errors related to moving files (expected on Windows).

<= div>The topology seems to work fine against both Kafka 0.10.2.2 and 1.1.1, = though 1.1.1 is slower due to https://issues.apache.org/jira/browse/STO= RM-3102.

The Kafka setup was with the def= ault configuration for both Kafka and Zookeeper, and Storm was set up with = a local Nimbus, single local Supervisor and 4 worker slots.

<= /div>
Saurabh please try to reproduce the issue using the topology I li= nked. If you need to make adjustments in order to provoke the issue, please= update the code and link it so I can check it out and try it.

Den ons. 5. = dec. 2018 kl. 16.42 skrev saurabh mimani <mimani.saurabh@gmail.com>:
=
No, I ha= ve checked that, there is no other consumer group consuming from the same.<= div>
Thanks for looking into it, let me know if you need any = other=C2=A0information.
=C2=A0
=


Best Regards

= Saurabh Kumar Mimani




On Wed, Dec 5, 2018 at 9:02 PM Stig Rohde= D=C3=B8ssing <stigdoessing@gmail.com> wrote:
Ravi, BaseBasicBolt does automatically anchor any emitted tuples to the in= put tuple. It's intended for bolts that just need to receive a tuple, s= ynchronously process it and emit some new tuples anchored to the input tupl= e. It's there because doing manual acking is tedious and error-prone in= cases where you don't need to be able to e.g. emit new unachored tuple= s or ack the input tuple asynchronously. As Peter mentioned, the BasicBoltE= xecutor (https://github.com/apache/storm/bl= ob/21bb1388414d373572779289edc785c7e5aa52aa/storm-client/src/jvm/org/apache= /storm/topology/BasicBoltExecutor.java#L42) handles acking for you.
=

Saurabh, I'll see if I can reprod= uce your issue. Please also check that you don't have any other consume= rs using the same consumer group as the spout.

Den ons. 5. dec. 2018 kl. 11.53 sk= rev Peter Chamberlain <peter.chamberlain@htk.co.uk>:
Pretty sure that the ack path is handled by BasicBoltExecutor = (an implmentation of IRichBolt), which calls collector.setContext(input), a= nd also does the acking inside it's execute function, and in between ca= lls the BaseBasicBolt.execute version (which takes the collector as well as= the tuple as parameters).
So the intention is clearly that it is= automatically anchored and acknowledged.

On Wed, 5 Dec 2018 at 09:57, Ravi Sharma = <ping2ravi@gmai= l.com> wrote:
Hi Saurabh,
I checked the BaseBasic= Bolt which comes with storm, it doesn't do much.
Also checked=C2=A0BasicOutputCollector=C2=A0and don't see= how it will anchor automatically unless you call=C2=A0 =C2=A0 =C2=A0BasicO= utputCollector.setContext(Tuple tuple), don't see all of your code, but= don't see this call in your boltA code.
Also it= looks like even when you make this setContext call, after that you will ha= ve to emit using following emit function

<= div dir=3D"auto">BasicOutputCollector.emit(String streamId, List<O= bject> tuples)

=
Basically just check that whatever emit function is calle= d, it does pass the input tuple.


Once I had exactly same issue for few= days, but mine was related to config. I wanted to read from two Kafka topi= cs in one topology and had two different kafkaspout created, mistakenly I c= opy pasted same config and that caused this issue. Not sure if that applies= to your scenario.


<= /div>
NOTE: I checked the latest storm master branc= h for code.






On Wed, 5 Dec 2018, 08:11 saura= bh mimani <mimani.saurabh@gmail.com wrote:
Hey Ravi,

I am usi= ng=C2=A0BaseBasicBolt, which as described here : Tuples emitte= d to=C2=A0BasicOutputCollector=C2=A0are automatically anchored to the = input tuple, and the input tuple is acked for you automatically when the ex= ecute method completes.

What you are saying is applicable for=C2=A0BaseRichBolt. The Kafka spout I am using is from storm-kafka-client library, so unique= ID, etc should be already taken care of.
=C2=A0


Best Regards

Saurabh Kumar Mimani




On Wed, Dec 5, 2018 at 1= 2:52 PM Ravi Sharma <ping2ravi@gmail.com> wrote:
Hi Saura= bh,
I think there is issue with part of code which is emit= ting the tuples.

If you = want to use ack mechanism, you need to use anchor tuple when you emit from = bolts.

Collector.emit(Tu= ple input, Values data)

= Also make sure Kafka spout emits tuple with a unique id.

Thanks
Ravi

On Wed, 5 Dec 2018, 06:35= saurabh mimani <mimani.saurabh@gmail.com wrote:
Hey,=C2=A0
Thanks for your reply. What you are saying is correct, Ho= wever I am able to reproduce it more often and I think it happens when mult= iple tuples gets failed in first run but all of those gets success on retry= , something of that sort is happening.

This can be= reproduced using following two bolts and kafkaSpout easily, by running in = cluster more with 3/4 minutes:

BoltA

<=
code style=3D"margin:0px;padding:0px;border:0px none;font-style:inherit;fon=
t-variant:inherit;font-weight:inherit;font-stretch:inherit;line-height:inhe=
rit;font-family:Consolas,Menlo,Monaco,"Lucida Console","Libe=
ration Mono","DejaVu Sans Mono","Bitstream Vera Sans Mo=
no","Courier New",monospace,sans-serif;vertical-align:baseli=
ne;box-sizing:inherit;white-space:inherit">case class Abc(index: Int, rand:=
 Boolean)

class BoltA  extends BaseBasicBolt {

  override def execute(input: Tuple, collector: BasicOutputCollector): Unit=
 =3D {
    val inp =3D input.getBinaryByField("value").getObj[someObj]
    val randomGenerator =3D new Random()

    var i =3D 0
    val rand =3D randomGenerator.nextBoolean()
    1 to 100 foreach {
      collector.emit(new Values(Abc(i, rand).getJsonBytes))
      i +=3D 1
    }
  }

  override def declareOutputFields(declarer: OutputFieldsDeclarer): Unit =
=3D {
    declarer.declare(new Fields("boltAout"))
  }

}

BoltB

=
class BoltB  extends BaseBasicBolt {

  override def execute(input: Tuple, collector: BasicOutputCollector): Unit=
 =3D {
    val abc =3D input.getBinaryByField("boltAout").getObj[Abc]
    println(s"Received ${abc.index}th tuple in BoltB")
    if(abc.index >=3D 97 && abc.rand){
      println(s"throwing FailedException for ${abc.index}th tuple for&=
quot;)
      throw new FailedException()
    }
  }

  override def declareOutputFields(declarer: OutputFieldsDeclarer): Unit =
=3D {
  }
}

KafkaSpout:

private def getKafkaSpoutConfig(source: Config) =3D KafkaSpoutConfig.=
builder("connections.kafka.producerConnProps.metadata.broker.list"=
;, "queueName")
    .setProp(ConsumerConfig.GROUP_ID_CONFIG, "grp")
    .setProp(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apac=
he.kafka.common.serialization.ByteArrayDeserializer")
    .setOffsetCommitPeriodMs(100)
    .setRetry(new KafkaSpoutRetryExponentialBackoff(
      KafkaSpoutRetryExponentialBackoff.TimeInterval.milliSeconds(100),
      KafkaSpoutRetryExponentialBackoff.TimeInterval.milliSeconds(100),
      10,
      KafkaSpoutRetryExponentialBackoff.TimeInterval.milliSeconds(3000)
    ))
    .setFirstPollOffsetStrategy(offsetStrategyMapping(ConnektConfig.getOrEl=
se("connections.kafka.consumerConnProps.offset.strategy", "U=
NCOMMITTED_EARLIEST")))
    .setMaxUncommittedOffsets(ConnektConfig.getOrElse("connections.kaf=
ka.consumerConnProps.max.uncommited.offset", 10000))
    .build()

Other config:

messageTime= outInSecons: 300


3D"Screenshot
=

=C2=A0


<= /p>

Best Regards

Saurabh Kuma= r Mimani


<= /div>

On Mon= , Dec 3, 2018 at 9:18 PM Stig Rohde D=C3=B8ssing <stigdoe= ssing@gmail.com> wrote:
Hi Saurabh,

The tup= le emitted by the spout will only be acked once all branches of the tuple t= ree have been acked, i.e. all 100 tuples are acked.

The error you're seeing was added as part of https://issues.apache.org/jira/browse/STORM-2666 to try = to avoid having that bug pop up again. Could you try posting your spout con= figuration? Also if possible, it would be helpful if you could enable debug= logging for=20 org.apache.storm.kafka.spout.KafkaSpout and maybe also org= .apache.storm.kafka.spout.internal.OffsetManager. They log when offs= ets are committed (e.g. https= ://github.com/apache/storm/blob/v1.2.1/external/storm-kafka-client/src/main= /java/org/apache/storm/kafka/spout/KafkaSpout.java#L546), and also when= the consumer position is changed (e.g.=20 https://github.com/ap= ache/storm/blob/v1.2.1/external/storm-kafka-client/src/main/java/org/apache= /storm/kafka/spout/KafkaSpout.java#L561 ). It should be possible to track down when/why the consumer position wound= up behind the committed offset.

=
Just so you're aware, the check that crashes the spout has= been removed as of https://issues.apache= .org/jira/browse/STORM-3102. I'd still like to know if there's = a bug in the spout causing it to emit tuples that were already committed th= ough.

Den man. 3. dec. 2018 kl. 11.29 skrev saurabh mimani <mimani.saurabh@gmail.com>:

Version Info:=20
   "org.apache.storm" % "storm-core" % "1.2.1"=
;=20
   "org.apache.storm" % "storm-kafka-client" % "1.=
2.1"=20

I have a storm topology which looks like following:<= /p>

boltA -> boltB -> bol= tC -> boltD

boltA=C2=A0just does some formatting of requests and emits another tuple.=C2=A0= boltB=C2=A0does some processing and emits aro= und 100 tuples for each tuple being received.=C2=A0b= oltC=C2=A0and=C2=A0boltD=C2=A0processe= s these tuples. All the bolts implements=C2=A0BaseBa= sicBolt.

What I am noticing is whenever=C2=A0boltD=C2=A0marks some tuple as fail and marks that for = retry by throwing=C2=A0FailedException, After= a few minutes less than my topology timeout, I get the following error:

2018-11-30T20:01:05.261+05:30 util [ERROR]=
 Async loop died!
java.lang.IllegalStateException: Attempting to emit a message that has alre=
ady been committed. This should never occur when using the at-least-once pr=
ocessing guarantee.
        at org.apache.storm.kafka.spout.KafkaSpout.emitOrRetryTuple(KafkaSp=
out.java:471) ~[stormjar.jar:?]
        at org.apache.storm.kafka.spout.KafkaSpout.emitIfWaitingNotEmitted(=
KafkaSpout.java:440) ~[stormjar.jar:?]
        at org.apache.storm.kafka.spout.KafkaSpout.nextTuple(KafkaSpout.jav=
a:308) ~[stormjar.jar:?]
        at org.apache.storm.daemon.executor$fn__4975$fn__4990$fn__5021.invo=
ke(executor.clj:654) ~[storm-core-1.2.1.jar:1.2.1]
        at org.apache.storm.util$async_loop$fn__557.invoke(util.clj:484) [s=
torm-core-1.2.1.jar:1.2.1]
        at clojure.lang.AFn.run(AFn.java:22) [clojure-1.7.0.jar:?]
        at java.lang.Thread.run(Thread.java:745) [?:1.8.0_60]
2018-11-30T20:01:05.262+05:30 executor [ERROR]
java.lang.IllegalStateException: Attempting to emit a message that has alre=
ady been committed. This should never occur when using the at-least-once pr=
ocessing guarantee.
        at org.apache.storm.kafka.spout.KafkaSpout.emitOrRetryTuple(KafkaSp=
out.java:471) ~[stormjar.jar:?]
        at org.apache.storm.kafka.spout.KafkaSpout.emitIfWaitingNotEmitted(=
KafkaSpout.java:440) ~[stormjar.jar:?]
        at org.apache.storm.kafka.spout.KafkaSpout.nextTuple(KafkaSpout.jav=
a:308) ~[stormjar.jar:?]
        at org.apache.storm.daemon.executor$fn__4975$fn__4990$fn__5021.invo=
ke(executor.clj:654) ~[storm-core-1.2.1.jar:1.2.1]
        at org.apache.storm.util$async_loop$fn__557.invoke(util.clj:484) [s=
torm-core-1.2.1.jar:1.2.1]
        at clojure.lang.AFn.run(AFn.java:22) [clojure-1.7.0.jar:?]
        at java.lang.Thread.run(Thread.java:745) [?:1.8.0_60]

What seems to be happening is this happens when=C2= =A0boltB=C2=A0emits 100 out of 1 tuple and=C2= =A0boltDfails one of the tuples out of those = 100 tuples, I am getting this error. Not able to understand how to fix this= , ideally it should=C2=A0ack=C2=A0an original= tuple when all 100 tuples are acked, but probably an original tuple is ack= ed before all those 100 tuples are acked, which causes this error.

=C2=A0


Best Regards

Saurabh Kumar Mimani





--

=C2=A0
=C2= =A0
Peter= Chamberlain=C2=A0|=C2=A0Senior Software Engineer=C2=A0|=C2= =A0HTK
=C2=A0
T:=C2=A0+44(0)870 600 2311
Conn= ect with me:<= span style=3D"color:rgb(6,169,235)">=C2=A0
Email
=C2=A0
=C2=A0
3D"htk
=C2=A0
<= span style=3D"font-family:Helvetica,arial,sans-serif">Connect with HTK:=C2=A0htk.co.uk= =C2=A0|=C2=A0Li= nkedIn=C2=A0|=C2=A0Twitter
=C2=A0
HTK Limited, Chapmans Warehouse, Wherry Qu= ay, Ipswich, IP4 1AS, UK.
Company Registered in England and Wales as 319= 1677, VAT Number 675 9467 71


<= div dir=3D"ltr">
PLEASE CONSIDE= R THE ENVIRONMENT BEFORE PRINTING THIS EMAIL.
=
This email is only f= or the use of the intended recipients and may contain privileged informatio= n. If you=E2=80=99ve received this email in error, please let the sender kn= ow; then delete the message. The views expre= ssed in this email represent those of the sender and not necessarily of HTK= .=C2=A0
--000000000000509caf057c9c0d0a--