From user-return-14258-archive-asf-public=cust-asf.ponee.io@storm.apache.org Sat Dec 8 07:25:34 2018 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx-eu-01.ponee.io (Postfix) with SMTP id BFD3A180670 for ; Sat, 8 Dec 2018 07:25:32 +0100 (CET) Received: (qmail 80944 invoked by uid 500); 8 Dec 2018 06:25:31 -0000 Mailing-List: contact user-help@storm.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: user@storm.apache.org Delivered-To: mailing list user@storm.apache.org Received: (qmail 80933 invoked by uid 99); 8 Dec 2018 06:25:31 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd3-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Sat, 08 Dec 2018 06:25:31 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd3-us-west.apache.org (ASF Mail Server at spamd3-us-west.apache.org) with ESMTP id 9D71C1804B8 for ; Sat, 8 Dec 2018 06:25:30 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd3-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 1.789 X-Spam-Level: * X-Spam-Status: No, score=1.789 tagged_above=-999 required=6.31 tests=[DKIM_SIGNED=0.1, DKIM_VALID=-0.1, DKIM_VALID_AU=-0.1, DKIM_VALID_EF=-0.1, HTML_FONT_LOW_CONTRAST=0.001, HTML_MESSAGE=2, RCVD_IN_DNSWL_NONE=-0.0001, RCVD_IN_MSPIKE_H2=-0.001, SPF_PASS=-0.001, T_DKIMWL_WL_MED=-0.01] autolearn=disabled Authentication-Results: spamd3-us-west.apache.org (amavisd-new); dkim=pass (2048-bit key) header.d=gmail.com Received: from mx1-lw-us.apache.org ([10.40.0.8]) by localhost (spamd3-us-west.apache.org [10.40.0.10]) (amavisd-new, port 10024) with ESMTP id jhlqitsYUnX4 for ; Sat, 8 Dec 2018 06:25:27 +0000 (UTC) Received: from mail-it1-f178.google.com (mail-it1-f178.google.com [209.85.166.178]) by mx1-lw-us.apache.org (ASF Mail Server at mx1-lw-us.apache.org) with ESMTPS id 85D905F57C for ; Sat, 8 Dec 2018 06:25:27 +0000 (UTC) Received: by mail-it1-f178.google.com with SMTP id x19so10332311itl.1 for ; Fri, 07 Dec 2018 22:25:27 -0800 (PST) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=gmail.com; s=20161025; h=mime-version:references:in-reply-to:from:date:message-id:subject:to; bh=3tVlOi3oKVlhM9kgNIYWsCLgwDctG1VJW94wbvcZOus=; b=PSAKbC5epuSwQ5CNaQNCUHzjsCVNvqE9jt772jUpkvbRroiGXjxkwegJnEcRkXycjZ i+jRKXTpjikQFTD2Hpk/Xx3RW0v07wChinNMXxXZBY/U1rBeBDQScUgNqtUc58xsjsoT 2q8NazUAJZDSsOGvqmZTXy8GgwTWwubNkmtWRJePCk5uEVL0DEzEZG/VdxsLqe//yNlT Qa7jk6HApQWvwLQuIyKAbcBiDDFa7yMqhjpMM3nD73mOozATc7JRvYuWp/WGQVUYPwpo ysFz/vhE0Q1JvuXBlvuJXnJxDISHC79OeOSG9jx75xPAZoX3TfsZlK3lkJpE5MwrzbE7 txug== X-Google-DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=1e100.net; s=20161025; h=x-gm-message-state:mime-version:references:in-reply-to:from:date :message-id:subject:to; bh=3tVlOi3oKVlhM9kgNIYWsCLgwDctG1VJW94wbvcZOus=; b=Aomjp3zfIpq54/vvSwXNmLm6UwvXbvHElnU77ew6Cg9eaRDcXmCQASNpUlvYc3aJjc Vg2D8BsEj5JCS7o9l8Bl7joYV6g766q3JQ7/e4HlnWrSV3HHM1JCRagBBcliOP2S84z4 bWmtLyUH1v4wL6dIMN6QFyXLRt0840IiqNYecDLLmnBV/zU5Iyc6VfJEAoDaZ9Cj4NZl YU5RfPtdw1H2hFda93X3OQnzqIdn9/2lC7n+XM6Sixv7kpslmAmMpu5N40yolUcXebOj O0jxwjIwSo5S288BeCkNJSfVvpCNPmzjYH6N8K314PGBqyh52QU8nUaiq1OVIU0/dk70 SZgw== X-Gm-Message-State: AA+aEWb+ajchLKSXY5SpLoqwgMIl4OrSgukcghWj3idA91n5fFReC/4s FqN4YPJ0lRgDZsOjB9aNsRMHBUpq3gaCemRpWWKV3D2FJhs= X-Google-Smtp-Source: AFSGD/Wf+ZfkSS/0xQtTsqB5Qr6w1F0FA9beZ+7ps06keoOXK6wpDc6Se1ik9pnQ+L7XvGY5nEUKXvFW9fWSZtCeF9I= X-Received: by 2002:a24:5d5:: with SMTP id 204mr4716753itl.122.1544250326413; Fri, 07 Dec 2018 22:25:26 -0800 (PST) MIME-Version: 1.0 References: In-Reply-To: From: saurabh mimani Date: Sat, 8 Dec 2018 11:55:00 +0530 Message-ID: Subject: Re: Storm using Kafka Spout gives error: IllegalStateException To: user@storm.apache.org Content-Type: multipart/alternative; boundary="000000000000267571057c7cccab" --000000000000267571057c7cccab Content-Type: text/plain; charset="UTF-8" Content-Transfer-Encoding: quoted-printable Hey, Thanks for looking into this. I was not able to produce this earlier on my local, however I will again try once. I was consistently able to reproduce it with parallelism of 5 for boltA and parallelism of 200 with boltB on 2 machines in cluster mode. I will try again with your code once. These are logs of Kafka Spout, when I was able to reproduce it in cluster mode with my topology, in case these helps. Best Regards Saurabh Kumar Mimani On Wed, Dec 5, 2018 at 11:33 PM Stig Rohde D=C3=B8ssing wrote: > I can't reproduce this. > > I created a test topology similar to the code you posted, based on the > 1.2.1 release tag > https://github.com/srdo/storm/commit/f5577f7a773f3d433b90a2670de5329b396f= 5564 > . > > I set up a local Kafka instance and put enough messages to run the > topology for 15 minutes or so in the test topic. After populating the > topic, I started the topology and let it run until it reached the end of > the topic. As expected a lot of messages failed, but after a while it > managed to successfully process all messages. I didn't see any worker > crashes, and the logs only show some errors related to moving files > (expected on Windows). > > The topology seems to work fine against both Kafka 0.10.2.2 and 1.1.1, > though 1.1.1 is slower due to > https://issues.apache.org/jira/browse/STORM-3102. > > The Kafka setup was with the default configuration for both Kafka and > Zookeeper, and Storm was set up with a local Nimbus, single local > Supervisor and 4 worker slots. > > Saurabh please try to reproduce the issue using the topology I linked. If > you need to make adjustments in order to provoke the issue, please update > the code and link it so I can check it out and try it. > > Den ons. 5. dec. 2018 kl. 16.42 skrev saurabh mimani < > mimani.saurabh@gmail.com>: > >> No, I have checked that, there is no other consumer group consuming from >> the same. >> >> Thanks for looking into it, let me know if you need any other informatio= n. >> >> >> >> Best Regards >> >> Saurabh Kumar Mimani >> >> >> >> >> On Wed, Dec 5, 2018 at 9:02 PM Stig Rohde D=C3=B8ssing >> wrote: >> >>> Ravi, BaseBasicBolt does automatically anchor any emitted tuples to the >>> input tuple. It's intended for bolts that just need to receive a tuple, >>> synchronously process it and emit some new tuples anchored to the input >>> tuple. It's there because doing manual acking is tedious and error-pron= e in >>> cases where you don't need to be able to e.g. emit new unachored tuples= or >>> ack the input tuple asynchronously. As Peter mentioned, the >>> BasicBoltExecutor ( >>> https://github.com/apache/storm/blob/21bb1388414d373572779289edc785c7e5= aa52aa/storm-client/src/jvm/org/apache/storm/topology/BasicBoltExecutor.jav= a#L42) >>> handles acking for you. >>> >>> Saurabh, I'll see if I can reproduce your issue. Please also check that >>> you don't have any other consumers using the same consumer group as the >>> spout. >>> >>> Den ons. 5. dec. 2018 kl. 11.53 skrev Peter Chamberlain < >>> peter.chamberlain@htk.co.uk>: >>> >>>> Pretty sure that the ack path is handled by BasicBoltExecutor (an >>>> implmentation of IRichBolt), which calls collector.setContext(input), = and >>>> also does the acking inside it's execute function, and in between call= s the >>>> BaseBasicBolt.execute version (which takes the collector as well as th= e >>>> tuple as parameters). >>>> So the intention is clearly that it is automatically anchored and >>>> acknowledged. >>>> >>>> On Wed, 5 Dec 2018 at 09:57, Ravi Sharma wrote: >>>> >>>>> Hi Saurabh, >>>>> I checked the BaseBasicBolt which comes with storm, it doesn't do muc= h. >>>>> Also checked BasicOutputCollector and don't see how it will anchor >>>>> automatically unless you call BasicOutputCollector.setContext(Tup= le >>>>> tuple), don't see all of your code, but don't see this call in your b= oltA >>>>> code. >>>>> Also it looks like even when you make this setContext call, after tha= t >>>>> you will have to emit using following emit function >>>>> >>>>> BasicOutputCollector.emit(String streamId, List tuples) >>>>> >>>>> Basically just check that whatever emit function is called, it does >>>>> pass the input tuple. >>>>> >>>>> >>>>> Once I had exactly same issue for few days, but mine was related to >>>>> config. I wanted to read from two Kafka topics in one topology and ha= d two >>>>> different kafkaspout created, mistakenly I copy pasted same config an= d that >>>>> caused this issue. Not sure if that applies to your scenario. >>>>> >>>>> >>>>> *NOTE*: I checked the latest storm master branch for code. >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> On Wed, 5 Dec 2018, 08:11 saurabh mimani >>>> wrote: >>>>> >>>>>> Hey Ravi, >>>>>> >>>>>> I am using *BaseBasicBolt*, which as described here >>>>>> >>>>>> : Tuples emitted to BasicOutputCollector are automatically anchored >>>>>> to the input tuple, and the input tuple is acked for you automatical= ly when >>>>>> the execute method completes. >>>>>> >>>>>> What you are saying is applicable for *BaseRichBolt. *The Kafka >>>>>> spout I am using is from storm-kafka-client >>>>>> >>>>>> library, so unique ID, etc should be already taken care of. >>>>>> >>>>>> >>>>>> >>>>>> Best Regards >>>>>> >>>>>> Saurabh Kumar Mimani >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> On Wed, Dec 5, 2018 at 12:52 PM Ravi Sharma >>>>>> wrote: >>>>>> >>>>>>> Hi Saurabh, >>>>>>> I think there is issue with part of code which is emitting the >>>>>>> tuples. >>>>>>> >>>>>>> If you want to use ack mechanism, you need to use anchor tuple when >>>>>>> you emit from bolts. >>>>>>> >>>>>>> Collector.emit(Tuple input, Values data) >>>>>>> >>>>>>> Also make sure Kafka spout emits tuple with a unique id. >>>>>>> >>>>>>> Thanks >>>>>>> Ravi >>>>>>> >>>>>>> >>>>>>> On Wed, 5 Dec 2018, 06:35 saurabh mimani >>>>>> wrote: >>>>>>> >>>>>>>> Hey, >>>>>>>> >>>>>>>> Thanks for your reply. What you are saying is correct, However I a= m >>>>>>>> able to reproduce it more often and I think it happens when multip= le tuples >>>>>>>> gets failed in first run but all of those gets success on retry, s= omething >>>>>>>> of that sort is happening. >>>>>>>> >>>>>>>> This can be reproduced using following two bolts and kafkaSpout >>>>>>>> easily, by running in cluster more with 3/4 minutes: >>>>>>>> >>>>>>>> *BoltA* >>>>>>>> >>>>>>>> case class Abc(index: Int, rand: Boolean) >>>>>>>> >>>>>>>> class BoltA extends BaseBasicBolt { >>>>>>>> >>>>>>>> override def execute(input: Tuple, collector: BasicOutputCollect= or): Unit =3D { >>>>>>>> val inp =3D input.getBinaryByField("value").getObj[someObj] >>>>>>>> val randomGenerator =3D new Random() >>>>>>>> >>>>>>>> var i =3D 0 >>>>>>>> val rand =3D randomGenerator.nextBoolean() >>>>>>>> 1 to 100 foreach { >>>>>>>> collector.emit(new Values(Abc(i, rand).getJsonBytes)) >>>>>>>> i +=3D 1 >>>>>>>> } >>>>>>>> } >>>>>>>> >>>>>>>> override def declareOutputFields(declarer: OutputFieldsDeclarer)= : Unit =3D { >>>>>>>> declarer.declare(new Fields("boltAout")) >>>>>>>> } >>>>>>>> >>>>>>>> } >>>>>>>> >>>>>>>> *BoltB* >>>>>>>> >>>>>>>> class BoltB extends BaseBasicBolt { >>>>>>>> >>>>>>>> override def execute(input: Tuple, collector: BasicOutputCollect= or): Unit =3D { >>>>>>>> val abc =3D input.getBinaryByField("boltAout").getObj[Abc] >>>>>>>> println(s"Received ${abc.index}th tuple in BoltB") >>>>>>>> if(abc.index >=3D 97 && abc.rand){ >>>>>>>> println(s"throwing FailedException for ${abc.index}th tuple = for") >>>>>>>> throw new FailedException() >>>>>>>> } >>>>>>>> } >>>>>>>> >>>>>>>> override def declareOutputFields(declarer: OutputFieldsDeclarer)= : Unit =3D { >>>>>>>> } >>>>>>>> } >>>>>>>> >>>>>>>> *KafkaSpout:* >>>>>>>> >>>>>>>> private def getKafkaSpoutConfig(source: Config) =3D KafkaSpoutConf= ig.builder("connections.kafka.producerConnProps.metadata.broker.list", "que= ueName") >>>>>>>> .setProp(ConsumerConfig.GROUP_ID_CONFIG, "grp") >>>>>>>> .setProp(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.= apache.kafka.common.serialization.ByteArrayDeserializer") >>>>>>>> .setOffsetCommitPeriodMs(100) >>>>>>>> .setRetry(new KafkaSpoutRetryExponentialBackoff( >>>>>>>> KafkaSpoutRetryExponentialBackoff.TimeInterval.milliSeconds(= 100), >>>>>>>> KafkaSpoutRetryExponentialBackoff.TimeInterval.milliSeconds(= 100), >>>>>>>> 10, >>>>>>>> KafkaSpoutRetryExponentialBackoff.TimeInterval.milliSeconds(= 3000) >>>>>>>> )) >>>>>>>> .setFirstPollOffsetStrategy(offsetStrategyMapping(ConnektConfi= g.getOrElse("connections.kafka.consumerConnProps.offset.strategy", "UNCOMMI= TTED_EARLIEST"))) >>>>>>>> .setMaxUncommittedOffsets(ConnektConfig.getOrElse("connections= .kafka.consumerConnProps.max.uncommited.offset", 10000)) >>>>>>>> .build() >>>>>>>> >>>>>>>> Other config: >>>>>>>> >>>>>>>> messageTimeoutInSecons: 300 >>>>>>>> >>>>>>>> [image: Screenshot 2018-12-05 at 12.03.08 PM.png] >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> Best Regards >>>>>>>> >>>>>>>> Saurabh Kumar Mimani >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> On Mon, Dec 3, 2018 at 9:18 PM Stig Rohde D=C3=B8ssing < >>>>>>>> stigdoessing@gmail.com> wrote: >>>>>>>> >>>>>>>>> Hi Saurabh, >>>>>>>>> >>>>>>>>> The tuple emitted by the spout will only be acked once all >>>>>>>>> branches of the tuple tree have been acked, i.e. all 100 tuples a= re acked. >>>>>>>>> >>>>>>>>> The error you're seeing was added as part of >>>>>>>>> https://issues.apache.org/jira/browse/STORM-2666 to try to avoid >>>>>>>>> having that bug pop up again. Could you try posting your spout >>>>>>>>> configuration? Also if possible, it would be helpful if you could= enable >>>>>>>>> debug logging for org.apache.storm.kafka.spout.KafkaSpout and >>>>>>>>> maybe also org.apache.storm.kafka.spout.internal.OffsetManager. >>>>>>>>> They log when offsets are committed (e.g. >>>>>>>>> https://github.com/apache/storm/blob/v1.2.1/external/storm-kafka-= client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java#L546), >>>>>>>>> and also when the consumer position is changed (e.g. >>>>>>>>> https://github.com/apache/storm/blob/v1.2.1/external/storm-kafka-= client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java#L561 >>>>>>>>> ). It should be possible to track down when/why the consumer posi= tion wound >>>>>>>>> up behind the committed offset. >>>>>>>>> >>>>>>>>> Just so you're aware, the check that crashes the spout has been >>>>>>>>> removed as of https://issues.apache.org/jira/browse/STORM-3102. >>>>>>>>> I'd still like to know if there's a bug in the spout causing it t= o emit >>>>>>>>> tuples that were already committed though. >>>>>>>>> >>>>>>>>> Den man. 3. dec. 2018 kl. 11.29 skrev saurabh mimani < >>>>>>>>> mimani.saurabh@gmail.com>: >>>>>>>>> >>>>>>>>>> >>>>>>>>>> Version Info: >>>>>>>>>> "org.apache.storm" % "storm-core" % "1.2.1" >>>>>>>>>> "org.apache.storm" % "storm-kafka-client" % "1.2.1" >>>>>>>>>> >>>>>>>>>> I have a storm topology which looks like following: >>>>>>>>>> >>>>>>>>>> boltA -> boltB -> boltC -> boltD >>>>>>>>>> >>>>>>>>>> boltA just does some formatting of requests and emits another >>>>>>>>>> tuple. boltB does some processing and emits around 100 tuples >>>>>>>>>> for each tuple being received. boltC and boltD processes these >>>>>>>>>> tuples. All the bolts implements BaseBasicBolt. >>>>>>>>>> >>>>>>>>>> What I am noticing is whenever boltD marks some tuple as fail >>>>>>>>>> and marks that for retry by throwing FailedException, After a >>>>>>>>>> few minutes less than my topology timeout, I get the following e= rror: >>>>>>>>>> >>>>>>>>>> 2018-11-30T20:01:05.261+05:30 util [ERROR] Async loop died! >>>>>>>>>> java.lang.IllegalStateException: Attempting to emit a message th= at has already been committed. This should never occur when using the at-le= ast-once processing guarantee. >>>>>>>>>> at org.apache.storm.kafka.spout.KafkaSpout.emitOrRetryTu= ple(KafkaSpout.java:471) ~[stormjar.jar:?] >>>>>>>>>> at org.apache.storm.kafka.spout.KafkaSpout.emitIfWaiting= NotEmitted(KafkaSpout.java:440) ~[stormjar.jar:?] >>>>>>>>>> at org.apache.storm.kafka.spout.KafkaSpout.nextTuple(Kaf= kaSpout.java:308) ~[stormjar.jar:?] >>>>>>>>>> at org.apache.storm.daemon.executor$fn__4975$fn__4990$fn= __5021.invoke(executor.clj:654) ~[storm-core-1.2.1.jar:1.2.1] >>>>>>>>>> at org.apache.storm.util$async_loop$fn__557.invoke(util.= clj:484) [storm-core-1.2.1.jar:1.2.1] >>>>>>>>>> at clojure.lang.AFn.run(AFn.java:22) [clojure-1.7.0.jar:= ?] >>>>>>>>>> at java.lang.Thread.run(Thread.java:745) [?:1.8.0_60] >>>>>>>>>> 2018-11-30T20:01:05.262+05:30 executor [ERROR] >>>>>>>>>> java.lang.IllegalStateException: Attempting to emit a message th= at has already been committed. This should never occur when using the at-le= ast-once processing guarantee. >>>>>>>>>> at org.apache.storm.kafka.spout.KafkaSpout.emitOrRetryTu= ple(KafkaSpout.java:471) ~[stormjar.jar:?] >>>>>>>>>> at org.apache.storm.kafka.spout.KafkaSpout.emitIfWaiting= NotEmitted(KafkaSpout.java:440) ~[stormjar.jar:?] >>>>>>>>>> at org.apache.storm.kafka.spout.KafkaSpout.nextTuple(Kaf= kaSpout.java:308) ~[stormjar.jar:?] >>>>>>>>>> at org.apache.storm.daemon.executor$fn__4975$fn__4990$fn= __5021.invoke(executor.clj:654) ~[storm-core-1.2.1.jar:1.2.1] >>>>>>>>>> at org.apache.storm.util$async_loop$fn__557.invoke(util.= clj:484) [storm-core-1.2.1.jar:1.2.1] >>>>>>>>>> at clojure.lang.AFn.run(AFn.java:22) [clojure-1.7.0.jar:= ?] >>>>>>>>>> at java.lang.Thread.run(Thread.java:745) [?:1.8.0_60] >>>>>>>>>> >>>>>>>>>> What seems to be happening is this happens when boltB emits 100 >>>>>>>>>> out of 1 tuple and boltDfails one of the tuples out of those 100 >>>>>>>>>> tuples, I am getting this error. Not able to understand how to f= ix this, >>>>>>>>>> ideally it should ack an original tuple when all 100 tuples are >>>>>>>>>> acked, but probably an original tuple is acked before all those = 100 tuples >>>>>>>>>> are acked, which causes this error. >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> Best Regards >>>>>>>>>> >>>>>>>>>> Saurabh Kumar Mimani >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>> >>>> -- >>>> >>>> >>>> >>>> *Peter Chamberlain* | Senior Software Engineer | HTK >>>> >>>> T: +44(0)870 600 2311 >>>> Connect with me: Email >>>> >>>> >>>> [image: htk logo] >>>> >>>> Connect with HTK: htk.co.uk | LinkedIn >>>> | Twitter >>>> >>>> >>>> HTK Limited, Chapmans Warehouse, Wherry Quay, Ipswich, IP4 1AS, UK. >>>> Company Registered in England and Wales as 3191677, VAT Number 675 946= 7 >>>> 71 >>>> >>>> >>>> PLEASE CONSIDER THE ENVIRONMENT BEFORE PRINTING THIS EMAIL. >>>> This email is only for the use of the intended recipients and may >>>> contain privileged information. If you=E2=80=99ve received this email = in error, >>>> please let the sender know; then delete the message. The views >>>> expressed in this email represent those of the sender and not necessar= ily >>>> of HTK. >>>> >>> --000000000000267571057c7cccab Content-Type: text/html; charset="UTF-8" Content-Transfer-Encoding: quoted-printable
Hey,=C2=A0

Thanks for looking into this= . I was not able to produce this earlier on my local, however I will again = try once. I was consistently able to reproduce it with parallelism of 5 for= boltA and parallelism of 200 with boltB on 2 machines in cluster mode.

I will try again with your code once.

<= /div>
These=C2=A0are logs of Kafka Spout, when I was able to repro= duce it in cluster mode with my topology, in case these helps.
=C2=A0


Bes= t Regards

Saurabh Kumar Mimani



On Wed, Dec 5, 2018 at 11:33 = PM Stig Rohde D=C3=B8ssing <st= igdoessing@gmail.com> wrote:
I can't reproduce this.

I created a test topo= logy similar to the code you posted, based on the 1.2.1 release tag https://github.com/srdo/storm/commit/f5577f7a773f= 3d433b90a2670de5329b396f5564.

I set up a local= Kafka instance and put enough messages to run the topology for 15 minutes = or so in the test topic. After populating the topic, I started the topology= and let it run until it reached the end of the topic. As expected a lot of= messages failed, but after a while it managed to successfully process all = messages. I didn't see any worker crashes, and the logs only show some = errors related to moving files (expected on Windows).

<= div>The topology seems to work fine against both Kafka 0.10.2.2 and 1.1.1, = though 1.1.1 is slower due to https://issues.apache.org/jira/browse/STO= RM-3102.

The Kafka setup was with the def= ault configuration for both Kafka and Zookeeper, and Storm was set up with = a local Nimbus, single local Supervisor and 4 worker slots.

<= /div>
Saurabh please try to reproduce the issue using the topology I li= nked. If you need to make adjustments in order to provoke the issue, please= update the code and link it so I can check it out and try it.

Den ons. 5. = dec. 2018 kl. 16.42 skrev saurabh mimani <mimani.saurabh@gmail.com>:
=
No, I ha= ve checked that, there is no other consumer group consuming from the same.<= div>
Thanks for looking into it, let me know if you need any = other=C2=A0information.
=C2=A0


Best Regards

Saurabh Kumar Mimani




On Wed, Dec 5, 2018 at 9:02 PM Stig Rohde D=C3= =B8ssing <st= igdoessing@gmail.com> wrote:
Ravi= , BaseBasicBolt does automatically anchor any emitted tuples to the input t= uple. It's intended for bolts that just need to receive a tuple, synchr= onously process it and emit some new tuples anchored to the input tuple. It= 's there because doing manual acking is tedious and error-prone in case= s where you don't need to be able to e.g. emit new unachored tuples or = ack the input tuple asynchronously. As Peter mentioned, the BasicBoltExecut= or (https://github.com/apache/storm/blob/21= bb1388414d373572779289edc785c7e5aa52aa/storm-client/src/jvm/org/apache/stor= m/topology/BasicBoltExecutor.java#L42) handles acking for you.

Saurabh, I'll see if I can reproduce y= our issue. Please also check that you don't have any other consumers us= ing the same consumer group as the spout.

Den ons. 5. dec. 2018 kl. 11.53 skrev P= eter Chamberlain <peter.chamberlain@htk.co.uk>:
Pretty sure that the ack path is handled by BasicBoltExecutor (an = implmentation of IRichBolt), which calls collector.setContext(input), and a= lso does the acking inside it's execute function, and in between calls = the BaseBasicBolt.execute version (which takes the collector as well as the= tuple as parameters).
So the intention is clearly that it is aut= omatically anchored and acknowledged.

On Wed, 5 Dec 2018 at 09:57, Ravi Sharma <ping2ravi@gmail.com<= /a>> wrote:
<= div dir=3D"auto">Hi Saurabh,
I checked the BaseBasicBolt w= hich comes with storm, it doesn't do much.
Also = checked=C2=A0BasicOutputCollector=C2=A0and don't see how i= t will anchor automatically unless you call=C2=A0 =C2=A0 =C2=A0BasicOutputC= ollector.setContext(Tuple tuple), don't see all of your code, but don&#= 39;t see this call in your boltA code.
Also it looks= like even when you make this setContext call, after that you will have to = emit using following emit function

BasicOutputCollector.emit(String streamId, List<Object&= gt; tuples)

Basically just check that whatever emit function is called, it = does pass the input tuple.


Once I had exactly same issue for few days, = but mine was related to config. I wanted to read from two Kafka topics in o= ne topology and had two different kafkaspout created, mistakenly I copy pas= ted same config and that caused this issue. Not sure if that applies to you= r scenario.


NOTE: I checked the latest storm master branch for c= ode.






On Wed, 5 Dec 2018, 08:11 saurabh mim= ani <miman= i.saurabh@gmail.com wrote:
Hey Ravi,

I am using=C2= =A0BaseBasicBolt, which as described here : Tuples emitted to= =C2=A0= BasicOutputCollector=C2=A0are automatically anchored to the inpu= t tuple, and the input tuple is acked for you automatically when the execut= e method completes.

What you are saying is applicable for=C2=A0BaseRichBolt. The Kafka spout I am using is from storm-kafka-client library, so unique ID,= etc should be already taken care of.
= =C2=A0


Best R= egards

Saurabh Kumar Mimani




<= div class=3D"gmail_quote">
On Wed, Dec 5, 2018 at 12:52 PM = Ravi Sharma <ping2ravi@gmail.com> wrote:
Hi Saurabh,
I think there is issue with part of code which is emitting the= tuples.

If you want to = use ack mechanism, you need to use anchor tuple when you emit from bolts.

Collector.emit(Tuple inpu= t, Values data)

Also mak= e sure Kafka spout emits tuple with a unique id.
Thanks
Ravi


On Wed, 5 Dec 2018, 06:35 saurabh= mimani <mimani.saurabh@gmail.com wrote:
Hey,=C2=A0

<= /div>
Thanks for your reply. What you are saying is correct, However I = am able to reproduce it more often and I think it happens when multiple tup= les gets failed in first run but all of those gets success on retry, someth= ing of that sort is happening.

This can be reprodu= ced using following two bolts and kafkaSpout easily, by running in cluster = more with 3/4 minutes:

BoltA

case class Abc(index: Int, rand: Boolean)

class BoltA  extends BaseBasicBolt {

  override def execute(input: Tuple, collector: BasicOutputCollector): Unit=
 =3D {
    val inp =3D input.getBinaryByField("value").getObj[someObj]
    val randomGenerator =3D new Random()

    var i =3D 0
    val rand =3D randomGenerator.nextBoolean()
    1 to 100 foreach {
      collector.emit(new Values(Abc(i, rand).getJsonBytes))
      i +=3D 1
    }
  }

  override def declareOutputFields(declarer: OutputFieldsDeclarer): Unit =
=3D {
    declarer.declare(new Fields("boltAout"))
  }

}

Bolt= B

class BoltB  extends=
 BaseBasicBolt {

  override def execute(input: Tuple, collector: BasicOutputCollector): Unit=
 =3D {
    val abc =3D input.getBinaryByField("boltAout").getObj[Abc]
    println(s"Received ${abc.index}th tuple in BoltB")
    if(abc.index >=3D 97 && abc.rand){
      println(s"throwing FailedException for ${abc.index}th tuple for&=
quot;)
      throw new FailedException()
    }
  }

  override def declareOutputFields(declarer: OutputFieldsDeclarer): Unit =
=3D {
  }
}

Kafk= aSpout:

private def ge=
tKafkaSpoutConfig(source: Config) =3D KafkaSpoutConfig.builder("connec=
tions.kafka.producerConnProps.metadata.broker.list", "queueName&q=
uot;)
    .setProp(ConsumerConfig.GROUP_ID_CONFIG, "grp")
    .setProp(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apac=
he.kafka.common.serialization.ByteArrayDeserializer")
    .setOffsetCommitPeriodMs(100)
    .setRetry(new KafkaSpoutRetryExponentialBackoff(
      KafkaSpoutRetryExponentialBackoff.TimeInterval.milliSeconds(100),
      KafkaSpoutRetryExponentialBackoff.TimeInterval.milliSeconds(100),
      10,
      KafkaSpoutRetryExponentialBackoff.TimeInterval.milliSeconds(3000)
    ))
    .setFirstPollOffsetStrategy(offsetStrategyMapping(ConnektConfig.getOrEl=
se("connections.kafka.consumerConnProps.offset.strategy", "U=
NCOMMITTED_EARLIEST")))
    .setMaxUncommittedOffsets(ConnektConfig.getOrElse("connections.kaf=
ka.consumerConnProps.max.uncommited.offset", 10000))
    .build()

Other config:

messageTimeoutInSecon= s: 300


=3D"Screenshot=

=C2=A0


Best Regards

Sa= urabh Kumar Mimani




On Mon, Dec 3, 2018 at 9:18 PM Stig Rohde D=C3=B8ssing <stigdoessing@gmail.com> wrote:
Hi Saurabh,

The tuple emitted by the spout will only be acked once all branches of th= e tuple tree have been acked, i.e. all 100 tuples are acked.

The error you're seeing was added as part of https://issues.apache.org/jira/browse/STORM-2666 to try to avoid having that bug pop up again. Could you try posting your = spout configuration? Also if possible, it would be helpful if you could ena= ble debug logging for=20 org.apache.storm.kafka.spout.KafkaSpout and maybe also org.apache.storm= .kafka.spout.internal.OffsetManager. They log when offsets are commi= tted (e.g. https://github.com= /apache/storm/blob/v1.2.1/external/storm-kafka-client/src/main/java/org/apa= che/storm/kafka/spout/KafkaSpout.java#L546), and also when the consumer= position is changed (e.g.=20 https://github.com/apache/s= torm/blob/v1.2.1/external/storm-kafka-client/src/main/java/org/apache/storm= /kafka/spout/KafkaSpout.java#L561 ). It should be possible to track down when/why the consumer position wound= up behind the committed offset.

<= span class=3D"gmail-m_3667145622374667857m_3539006618238878145m_-4588250779= 408397261m_3773155540341504015m_-6039529450505925230m_4785919096267561022m_= -3588050287917631703m_3765336107993374286m_3126959581346374647gmail-pl-smi"= >Just so you're aware, the check that crashes the spout has been remove= d as of https://issues.apache.org/jira/br= owse/STORM-3102. I'd still like to know if there's a bug in the= spout causing it to emit tuples that were already committed though.

Den man. 3. dec. 2018 kl. 11.29 skrev saurabh mimani <mimani.saurabh@gmail.com>:

Version Info:=20
   "org.apache.storm" % "storm-core" % "1.2.1"=
;=20
   "org.apache.storm" % "storm-kafka-client" % "1.=
2.1"=20

I have a storm topology which looks like following:

boltA -> boltB= -> boltC -> boltD

boltA=C2=A0just does some formatting of requests and emits another tuple.=C2=A0= boltB=C2=A0does some processing and emits around 1= 00 tuples for each tuple being received.=C2=A0boltC=C2=A0and=C2=A0boltD=C2=A0processes these tuples.= All the bolts implements=C2=A0BaseBasicBolt.

<= p style=3D"margin:0px 0px 1em;padding:0px;border:0px;font-style:inherit;fon= t-variant:inherit;font-weight:inherit;font-stretch:inherit;line-height:inhe= rit;font-family:inherit;vertical-align:baseline;box-sizing:inherit;clear:bo= th">What I am noticing is whenever=C2=A0boltD=C2= =A0marks some tuple as fail and marks that for retry by throwing=C2=A0FailedException, After a few minutes less than my topol= ogy timeout, I get the following error:

2018-11-30T2=
0:01:05.261+05:30 util [ERROR] Async loop died!
java.lang.IllegalStateException: Attempting to emit a message that has alre=
ady been committed. This should never occur when using the at-least-once pr=
ocessing guarantee.
        at org.apache.storm.kafka.spout.KafkaSpout.emitOrRetryTuple(KafkaSp=
out.java:471) ~[stormjar.jar:?]
        at org.apache.storm.kafka.spout.KafkaSpout.emitIfWaitingNotEmitted(=
KafkaSpout.java:440) ~[stormjar.jar:?]
        at org.apache.storm.kafka.spout.KafkaSpout.nextTuple(KafkaSpout.jav=
a:308) ~[stormjar.jar:?]
        at org.apache.storm.daemon.executor$fn__4975$fn__4990$fn__5021.invo=
ke(executor.clj:654) ~[storm-core-1.2.1.jar:1.2.1]
        at org.apache.storm.util$async_loop$fn__557.invoke(util.clj:484) [s=
torm-core-1.2.1.jar:1.2.1]
        at clojure.lang.AFn.run(AFn.java:22) [clojure-1.7.0.jar:?]
        at java.lang.Thread.run(Thread.java:745) [?:1.8.0_60]
2018-11-30T20:01:05.262+05:30 executor [ERROR]
java.lang.IllegalStateException: Attempting to emit a message that has alre=
ady been committed. This should never occur when using the at-least-once pr=
ocessing guarantee.
        at org.apache.storm.kafka.spout.KafkaSpout.emitOrRetryTuple(KafkaSp=
out.java:471) ~[stormjar.jar:?]
        at org.apache.storm.kafka.spout.KafkaSpout.emitIfWaitingNotEmitted(=
KafkaSpout.java:440) ~[stormjar.jar:?]
        at org.apache.storm.kafka.spout.KafkaSpout.nextTuple(KafkaSpout.jav=
a:308) ~[stormjar.jar:?]
        at org.apache.storm.daemon.executor$fn__4975$fn__4990$fn__5021.invo=
ke(executor.clj:654) ~[storm-core-1.2.1.jar:1.2.1]
        at org.apache.storm.util$async_loop$fn__557.invoke(util.clj:484) [s=
torm-core-1.2.1.jar:1.2.1]
        at clojure.lang.AFn.run(AFn.java:22) [clojure-1.7.0.jar:?]
        at java.lang.Thread.run(Thread.java:745) [?:1.8.0_60]

What seems to be happening is this happens when=C2=A0boltB=C2=A0emits 100 out of 1 tuple and=C2=A0boltDfails one of the tuples out of those 100 tuples, I am = getting this error. Not able to understand how to fix this, ideally it shou= ld=C2=A0ack=C2=A0an original tuple when all 100 tu= ples are acked, but probably an original tuple is acked before all those 10= 0 tuples are acked, which causes this error.

=C2=A0


=

Best Regards

Saurabh Kum= ar Mimani





--

=C2=A0
=C2=A0Peter Cham= berlain=C2=A0|=C2=A0Se= nior Software Engineer=C2=A0|=C2=A0HTK
=C2=A0
T:=C2=A0+44(0)870 600 2311
Connect = with me:=C2=A0
E= mail
=C2=A0
=C2=A0
3D"htk
=C2=A0
Connect with HTK:= =C2=A0htk.co.uk=C2= =A0|=C2=A0Linke= dIn=C2=A0|=C2=A0Twitter
=C2=A0
HTK Limited, Chapmans Warehouse, Wherry Quay,= Ipswich, IP4 1AS, UK.
Company Registered in England and Wales as 319167= 7, VAT Number 675 9467 71



PLEASE CONSIDER T= HE ENVIRONMENT BEFORE PRINTING THIS EMAIL.
This email is only for = the use of the intended recipients and may contain privileged information. = If you=E2=80=99ve received this email in error, please let the sender know<= /font>; then delete the message. The views expresse= d in this email represent those of the sender and not necessarily of HTK.=C2=A0
<= div dir=3D"ltr">
= --000000000000267571057c7cccab--