Return-Path: X-Original-To: apmail-kafka-dev-archive@www.apache.org Delivered-To: apmail-kafka-dev-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id F380D181C0 for ; Wed, 6 Jan 2016 14:58:41 +0000 (UTC) Received: (qmail 14794 invoked by uid 500); 6 Jan 2016 14:58:41 -0000 Delivered-To: apmail-kafka-dev-archive@kafka.apache.org Received: (qmail 14712 invoked by uid 500); 6 Jan 2016 14:58:40 -0000 Mailing-List: contact dev-help@kafka.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@kafka.apache.org Delivered-To: mailing list dev@kafka.apache.org Received: (qmail 14444 invoked by uid 99); 6 Jan 2016 14:58:40 -0000 Received: from Unknown (HELO spamd2-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 06 Jan 2016 14:58:40 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd2-us-west.apache.org (ASF Mail Server at spamd2-us-west.apache.org) with ESMTP id 22B181A002C for ; Wed, 6 Jan 2016 14:58:40 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd2-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 3.751 X-Spam-Level: *** X-Spam-Status: No, score=3.751 tagged_above=-999 required=6.31 tests=[DKIM_SIGNED=0.1, DKIM_VALID=-0.1, HEADER_FROM_DIFFERENT_DOMAINS=0.001, HTML_MESSAGE=3, KAM_INFOUSMEBIZ=0.75, SPF_PASS=-0.001, URIBL_BLOCKED=0.001] autolearn=disabled Authentication-Results: spamd2-us-west.apache.org (amavisd-new); dkim=pass (2048-bit key) header.d=gmail.com Received: from mx1-eu-west.apache.org ([10.40.0.8]) by localhost (spamd2-us-west.apache.org [10.40.0.9]) (amavisd-new, port 10024) with ESMTP id 9OgedMbbvYOz for ; Wed, 6 Jan 2016 14:58:27 +0000 (UTC) Received: from mail-wm0-f44.google.com (mail-wm0-f44.google.com [74.125.82.44]) by mx1-eu-west.apache.org (ASF Mail Server at mx1-eu-west.apache.org) with ESMTPS id EB55231AA1 for ; Wed, 6 Jan 2016 14:58:26 +0000 (UTC) Received: by mail-wm0-f44.google.com with SMTP id b14so79210622wmb.1 for ; Wed, 06 Jan 2016 06:58:26 -0800 (PST) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=gmail.com; s=20120113; h=mime-version:sender:in-reply-to:references:from:date:message-id :subject:to:content-type; bh=t7ir+yVtyzPLBul6uWbFGB6Arq/JpYmOEZ3+D+7jmcw=; b=kLenWItx3M/DVNx+Wd3sjexQU/JKA+KMB+HnqrYP2IhyvOD5WzkjzHpo+oV65XDXr+ 1JCdBZ16MkDX6jOwAAnrYo7bf2Phu3QhU3tvrYuzKd1kbV7zckkC2zVJuUFDPQSHbDWV GePt56W2mD7Sja8KfODvfTe9+/EiOjx8DNA/HbnDVUb08acVQdrb5aRmsmiqAdvxmOox BH1G04+F+Z4SbQb11+QU5NPFf0ZgA4fplwKu2Yw4FRQxdm0v6n/rfYkNrKuADwGJNMW5 aAU3oiSzcnZ98krJCmvs7Md1Nu0BZRHIXxfhAnjSrK5+XGuFdEq5ToBGKz56BElQqDWn reiQ== X-Received: by 10.28.138.6 with SMTP id m6mr10972157wmd.82.1452092306496; Wed, 06 Jan 2016 06:58:26 -0800 (PST) MIME-Version: 1.0 Sender: ismaelj@gmail.com Received: by 10.194.185.147 with HTTP; Wed, 6 Jan 2016 06:57:47 -0800 (PST) In-Reply-To: References: From: Ismael Juma Date: Wed, 6 Jan 2016 14:57:47 +0000 X-Google-Sender-Auth: lv3NwUYYdOcKosOWkxF7Mm5QlTw Message-ID: Subject: Re: KIP-41: KafkaConsumer Max Records To: dev@kafka.apache.org Content-Type: multipart/alternative; boundary=001a114436d61c26c70528ab9612 --001a114436d61c26c70528ab9612 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: quoted-printable Thanks Jason. I read the KIP and it makes sense to me. A minor suggestion: in the "Ensuring Fair Consumption" section, there are 3 paragraphs with 2 examples (2 partitions with 100 max.poll.records and 3 partitions with 30 max.poll.records). I think you could simplify this by using one of the examples in the 3 paragraphs. Ismael On Tue, Jan 5, 2016 at 7:32 PM, Jason Gustafson wrote: > I've updated the KIP with some implementation details. I also added more > discussion on the heartbeat() alternative. The short answer for why we > rejected this API is that it doesn't seem to work well with offset commit= s. > This would tend to make correct usage complicated and difficult to explai= n. > Additionally, we don't see any clear advantages over having a way to set > the max records. For example, using max.records=3D1 would be equivalent t= o > invoking heartbeat() on each iteration of the message processing loop. > > Going back to the discussion on whether we should use a configuration val= ue > or overload poll(), I'm leaning toward the configuration option mainly fo= r > compatibility and to keep the KafkaConsumer API from getting any more > complex. Also, as others have mentioned, it seems reasonable to want to > tune this setting in the same place that the session timeout and heartbea= t > interval are configured. I still feel a little uncomfortable with the nee= d > to do a lot of configuration tuning to get the consumer working for a > particular environment, but hopefully the defaults are conservative enoug= h > that most users won't need to. However, if it remains a problem, then we > could still look into better options for managing the size of batches > including overloading poll() with a max records argument or possibly by > implementing a batch scaling algorithm internally. > > -Jason > > > On Mon, Jan 4, 2016 at 12:18 PM, Jason Gustafson > wrote: > > > Hi Cliff, > > > > I think we're all agreed that the current contract of poll() should be > > kept. The consumer wouldn't wait for max messages to become available i= n > > this proposal; it would only sure that it never returns more than max > > messages. > > > > -Jason > > > > On Mon, Jan 4, 2016 at 11:52 AM, Cliff Rhyne wrote: > > > >> Instead of a heartbeat, I'd prefer poll() to return whatever messages > the > >> client has. Either a) I don't care if I get less than my max message > >> limit > >> or b) I do care and will set a larger timeout. Case B is less common > than > >> A and is fairly easy to handle in the application's code. > >> > >> On Mon, Jan 4, 2016 at 1:47 PM, Gwen Shapira wrote= : > >> > >> > 1. Agree that TCP window style scaling will be cool. I'll try to thi= nk > >> of a > >> > good excuse to use it ;) > >> > > >> > 2. I'm very concerned about the challenges of getting the timeouts, > >> > hearbeats and max messages right. > >> > > >> > Another option could be to expose "heartbeat" API to consumers. If m= y > >> app > >> > is still processing data but is still alive, it could initiate a > >> heartbeat > >> > to signal its alive without having to handle additional messages. > >> > > >> > I don't know if this improves more than it complicates though :( > >> > > >> > On Mon, Jan 4, 2016 at 11:40 AM, Jason Gustafson > >> > wrote: > >> > > >> > > Hey Gwen, > >> > > > >> > > I was thinking along the lines of TCP window scaling in order to > >> > > dynamically find a good consumption rate. Basically you'd start of= f > >> > > consuming say 100 records and you'd let it increase until the > >> consumption > >> > > took longer than half the session timeout (for example). You /migh= t/ > >> be > >> > > able to achieve the same thing using pause/resume, but it would be= a > >> lot > >> > > trickier since you have to do it at the granularity of partitions. > But > >> > > yeah, database write performance doesn't always scale in a > predictable > >> > > enough way to accommodate this, so I'm not sure how useful it woul= d > >> be in > >> > > practice. It might also be more difficult to implement since it > >> wouldn't > >> > be > >> > > as clear when to initiate the next fetch. With a static setting, t= he > >> > > consumer knows exactly how many records will be returned on the ne= xt > >> call > >> > > to poll() and can send fetches accordingly. > >> > > > >> > > On the other hand, I do feel a little wary of the need to tune the > >> > session > >> > > timeout and max messages though since these settings might depend = on > >> the > >> > > environment that the consumer is deployed in. It wouldn't be a big > >> deal > >> > if > >> > > the impact was relatively minor, but getting them wrong can cause = a > >> lot > >> > of > >> > > rebalance churn which could keep the consumer from making any > >> progress. > >> > > It's not a particularly graceful failure. > >> > > > >> > > -Jason > >> > > > >> > > On Mon, Jan 4, 2016 at 10:49 AM, Gwen Shapira > >> wrote: > >> > > > >> > > > I can't speak to all use-cases, but for the database one, I thin= k > >> > > > pause-resume will be necessary in any case, and therefore dynami= c > >> batch > >> > > > sizes are not needed. > >> > > > > >> > > > Databases are really unexpected regarding response times - load > and > >> > > locking > >> > > > can affect this. I'm not sure there's a good way to know you are > >> going > >> > > into > >> > > > rebalance hell before it is too late. So if I were writing code > that > >> > > > updates an RDBMS based on Kafka, I'd pick a reasonable batch siz= e > >> (say > >> > > 5000 > >> > > > records), and basically pause, batch-insert all records, commit > and > >> > > resume. > >> > > > > >> > > > Does that make sense? > >> > > > > >> > > > On Mon, Jan 4, 2016 at 10:37 AM, Jason Gustafson < > >> jason@confluent.io> > >> > > > wrote: > >> > > > > >> > > > > Gwen and Ismael, > >> > > > > > >> > > > > I agree the configuration option is probably the way to go, bu= t > I > >> was > >> > > > > wondering whether there would be cases where it made sense to > let > >> the > >> > > > > consumer dynamically set max messages to adjust for downstream > >> > > slowness. > >> > > > > For example, if the consumer is writing consumed records to > >> another > >> > > > > database, and that database is experiencing heavier than > expected > >> > load, > >> > > > > then the consumer could halve its current max messages in orde= r > to > >> > > adapt > >> > > > > without risking rebalance hell. It could then increase max > >> messages > >> > as > >> > > > the > >> > > > > load on the database decreases. It's basically an easier way t= o > >> > handle > >> > > > flow > >> > > > > control than we provide with pause/resume. > >> > > > > > >> > > > > -Jason > >> > > > > > >> > > > > On Mon, Jan 4, 2016 at 9:46 AM, Gwen Shapira > > >> > > wrote: > >> > > > > > >> > > > > > The wiki you pointed to is no longer maintained and fell out > of > >> > sync > >> > > > with > >> > > > > > the code and protocol. > >> > > > > > > >> > > > > > You may want to refer to: > >> > > > > > > >> > > > > > > >> > > > > > >> > > > > >> > > > >> > > >> > https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Pr= otocol > >> > > > > > > >> > > > > > On Mon, Jan 4, 2016 at 4:38 AM, Jens Rantil < > >> jens.rantil@tink.se> > >> > > > wrote: > >> > > > > > > >> > > > > > > Hi guys, > >> > > > > > > > >> > > > > > > I realized I never thanked yall for your input - thanks! > >> > > > > > > Jason: I apologize for assuming your stance on the issue! > >> Feels > >> > > like > >> > > > we > >> > > > > > all > >> > > > > > > agreed on the solution. +1 > >> > > > > > > > >> > > > > > > Follow-up: Jason made a point about defining prefetch and > >> > fairness > >> > > > > > > behaviour in the KIP. I am now working on putting that dow= n > in > >> > > > writing. > >> > > > > > To > >> > > > > > > do be able to do this I think I need to understand the > current > >> > > > prefetch > >> > > > > > > behaviour in the new consumer API (0.9) a bit better. Some > >> > specific > >> > > > > > > questions: > >> > > > > > > > >> > > > > > > - How does a specific consumer balance incoming message= s > >> from > >> > > > > multiple > >> > > > > > > partitions? Is the consumer simply issuing Multi-Fetch > >> > > requests[1] > >> > > > > for > >> > > > > > > the > >> > > > > > > consumed assigned partitions of the relevant topics? Or > is > >> the > >> > > > > > consumer > >> > > > > > > fetching from one partition at a time and balancing > between > >> > them > >> > > > > > > internally? That is, is the responsibility of partition > >> > > balancing > >> > > > > (and > >> > > > > > > fairness) on the broker side or consumer side? > >> > > > > > > - Is the above documented somewhere? > >> > > > > > > > >> > > > > > > [1] > >> > > > > > > > >> > > > > > > > >> > > > > > > >> > > > > > >> > > > > >> > > > >> > > >> > https://cwiki.apache.org/confluence/display/KAFKA/Writing+a+Driver+for+Ka= fka > >> > > > > > > , > >> > > > > > > see "Multi-Fetch". > >> > > > > > > > >> > > > > > > Thanks, > >> > > > > > > Jens > >> > > > > > > > >> > > > > > > On Wed, Dec 23, 2015 at 2:44 AM, Ismael Juma < > >> ismael@juma.me.uk> > >> > > > > wrote: > >> > > > > > > > >> > > > > > > > On Wed, Dec 23, 2015 at 1:24 AM, Gwen Shapira < > >> > gwen@confluent.io > >> > > > > >> > > > > > wrote: > >> > > > > > > > > >> > > > > > > > > Given the background, it sounds like you'll generally > want > >> > each > >> > > > > call > >> > > > > > to > >> > > > > > > > > poll() to return the same number of events (which is t= he > >> > number > >> > > > you > >> > > > > > > > planned > >> > > > > > > > > on having enough memory / time for). It also sounds li= ke > >> > tuning > >> > > > the > >> > > > > > > > number > >> > > > > > > > > of events will be closely tied to tuning the session > >> timeout. > >> > > > That > >> > > > > > is - > >> > > > > > > > if > >> > > > > > > > > I choose to lower the session timeout for some reason,= I > >> will > >> > > > have > >> > > > > to > >> > > > > > > > > modify the number of records returning too. > >> > > > > > > > > > >> > > > > > > > > If those assumptions are correct, I think a > configuration > >> > makes > >> > > > > more > >> > > > > > > > sense. > >> > > > > > > > > 1. We are unlikely to want this parameter to be change > at > >> the > >> > > > > > lifetime > >> > > > > > > of > >> > > > > > > > > the consumer > >> > > > > > > > > 2. The correct value is tied to another configuration > >> > > parameter, > >> > > > so > >> > > > > > > they > >> > > > > > > > > will be controlled together. > >> > > > > > > > > > >> > > > > > > > > >> > > > > > > > I was thinking the same thing. > >> > > > > > > > > >> > > > > > > > Ismael > >> > > > > > > > > >> > > > > > > > >> > > > > > > > >> > > > > > > > >> > > > > > > -- > >> > > > > > > Jens Rantil > >> > > > > > > Backend engineer > >> > > > > > > Tink AB > >> > > > > > > > >> > > > > > > Email: jens.rantil@tink.se > >> > > > > > > Phone: +46 708 84 18 32 > >> > > > > > > Web: www.tink.se > >> > > > > > > > >> > > > > > > Facebook Linkedin > >> > > > > > > < > >> > > > > > > > >> > > > > > > >> > > > > > >> > > > > >> > > > >> > > >> > http://www.linkedin.com/company/2735919?trk=3Dvsrp_companies_res_photo&tr= kInfo=3DVSRPsearchId%3A1057023381369207406670%2CVSRPtargetId%3A2735919%2CVS= RPcmpt%3Aprimary > >> > > > > > > > > >> > > > > > > Twitter > >> > > > > > > > >> > > > > > > >> > > > > > >> > > > > >> > > > >> > > >> > >> > >> > >> -- > >> Cliff Rhyne > >> Software Engineering Lead > >> e: crhyne@signal.co > >> signal.co > >> ________________________ > >> > >> Cut Through the Noise > >> > >> This e-mail and any files transmitted with it are for the sole use of > the > >> intended recipient(s) and may contain confidential and privileged > >> information. Any unauthorized use of this email is strictly prohibited= . > >> =C2=A92015 Signal. All rights reserved. > >> > > > > > --001a114436d61c26c70528ab9612--