Return-Path: X-Original-To: apmail-kafka-dev-archive@www.apache.org Delivered-To: apmail-kafka-dev-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 9E18318BB4 for ; Fri, 8 Jan 2016 17:42:50 +0000 (UTC) Received: (qmail 12208 invoked by uid 500); 8 Jan 2016 17:42:50 -0000 Delivered-To: apmail-kafka-dev-archive@kafka.apache.org Received: (qmail 12126 invoked by uid 500); 8 Jan 2016 17:42:50 -0000 Mailing-List: contact dev-help@kafka.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@kafka.apache.org Delivered-To: mailing list dev@kafka.apache.org Received: (qmail 12114 invoked by uid 99); 8 Jan 2016 17:42:49 -0000 Received: from Unknown (HELO spamd2-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 08 Jan 2016 17:42:49 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd2-us-west.apache.org (ASF Mail Server at spamd2-us-west.apache.org) with ESMTP id 6C34F1A0375 for ; Fri, 8 Jan 2016 17:42:49 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd2-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 3.001 X-Spam-Level: *** X-Spam-Status: No, score=3.001 tagged_above=-999 required=6.31 tests=[DKIM_SIGNED=0.1, DKIM_VALID=-0.1, HTML_MESSAGE=3, URIBL_BLOCKED=0.001] autolearn=disabled Authentication-Results: spamd2-us-west.apache.org (amavisd-new); dkim=pass (2048-bit key) header.d=confluent-io.20150623.gappssmtp.com Received: from mx1-eu-west.apache.org ([10.40.0.8]) by localhost (spamd2-us-west.apache.org [10.40.0.9]) (amavisd-new, port 10024) with ESMTP id 36OMW4gecnkr for ; Fri, 8 Jan 2016 17:42:36 +0000 (UTC) Received: from mail-lb0-f171.google.com (mail-lb0-f171.google.com [209.85.217.171]) by mx1-eu-west.apache.org (ASF Mail Server at mx1-eu-west.apache.org) with ESMTPS id 4A73620511 for ; Fri, 8 Jan 2016 17:42:35 +0000 (UTC) Received: by mail-lb0-f171.google.com with SMTP id cl12so11103875lbc.1 for ; Fri, 08 Jan 2016 09:42:35 -0800 (PST) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=confluent-io.20150623.gappssmtp.com; s=20150623; h=mime-version:in-reply-to:references:date:message-id:subject:from:to :content-type; bh=Xp8lsxHfw2PbQyjfCP2J7nPA3DblgiHFy6qML4JFsBE=; b=vWLgb2GttbpYUVQ+eWEg5CJlRaj5vg6HUwPRt3uLzYffJRxNBdQmTY9x2IkUG3stIp n/NdJ7dVI1lwnrV8Fx0lVWsrIIE+n43QutAANcGibD0ENFVR8eD5NzzUZqDNdzcN82at GC4CVwybot/LQYCR0aS7Ady3BAygf9kAtqTGTEmmTwjz/mG8R8AVIRDsYDeFHX26EClO bw0EWy19fqPc1e+/5eh9+OjcpPCThyBjMdKLPGDPKo3Swkt2rEo87tIlJKiutSS3MFy9 kQ3XyHr9ZqcAfpPanY6aBcSbpYJVBSpbESmBoFj+l+RuLbFLy3jEiOXVV6LJNWZzV8p8 AKDQ== X-Google-DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=1e100.net; s=20130820; h=x-gm-message-state:mime-version:in-reply-to:references:date :message-id:subject:from:to:content-type; bh=Xp8lsxHfw2PbQyjfCP2J7nPA3DblgiHFy6qML4JFsBE=; b=OJO9SytrSaXswoN4pKP05Xvl1Unjo5nO03cNZ4e6RwL5TnfY0DwxnX5oO5qHxMGYog tIyo2uQHLigh8XcBpDVHshS4jQxQkonhq1yjRCucRChEZEEdp3o4QCmv4maaUEz4sBrv p3Ex82qP1sv6Se/3Oxf65mxRD/LZvzFJ/8C3lQUJIjiH7g6SHs3bUE2cXtyHxFRTkvmR 52aS0e+9Dmn9ZweU26OfjXSDmMly1AtE/WOieNDgcYoMQbsh6J2y5maDvCU6lL+3G1W+ 9UJr36i3f563dHggy8wd+REAfECnHwF2VGkgfBNyNAUX0MNHk7r+2pRP4V6baXT8K76H IPkg== X-Gm-Message-State: ALoCoQnYPiu050syztIPIbHjZQVkJlVkQY/xm1dp9ely394t4Jo/iheMEtVLVSGLkm8/eqZv/Nieemb9njUy/rOXhNL7vyrVRA== MIME-Version: 1.0 X-Received: by 10.112.143.227 with SMTP id sh3mr21464478lbb.55.1452274954486; Fri, 08 Jan 2016 09:42:34 -0800 (PST) Received: by 10.112.16.227 with HTTP; Fri, 8 Jan 2016 09:42:34 -0800 (PST) In-Reply-To: References: Date: Fri, 8 Jan 2016 09:42:34 -0800 Message-ID: Subject: Re: KIP-41: KafkaConsumer Max Records From: Jason Gustafson To: dev@kafka.apache.org Content-Type: multipart/alternative; boundary=089e01160cd6c7642c0528d61c1b --089e01160cd6c7642c0528d61c1b Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: quoted-printable Hi Aarti, Thanks for the feedback. I think the concern about memory overhead is valid. As Guozhang mentioned, the problem already exists in the current consumer, so this probably deserves consideration outside of this KIP. That said, it's a good question whether our prefetching strategy makes it more difficult to control the memory overhead. The approach we've proposed for prefetching is basically the following: fetch all partitions whenever the number of retained messages is less than max.poll.records. In the worst case, this increases the maximum memory used by the consumer by the size of those retained messages. As you've pointed out, messages could be very large. We could reduce this requirement with a slight change: instead of fetching all partitions, we could fetch only those with no retained data. That would reduce the worst-case overhead to #no partitions * max.partition.fetch.bytes, which matches the existing memory overhead. Would that address your concern? A couple other points worth mentioning is that users have the option not to use max.poll.records, in which case the behavior will be the same as in the current consumer. Additionally, the implementation can be changed over time without affecting users, so we can adjust it in particular when we address memory concerns in KAFKA-2045. On a side note, I'm wondering if it would be useful to extend this KIP to include a max.poll.bytes? For some use cases, it may make more sense to control the processing time by the size of data instead of the number of records. Not that I'm in anxious to draw this out, but if we'll need this setting eventually, we may as well do it now. Thoughts? -Jason On Fri, Jan 8, 2016 at 1:03 AM, Jens Rantil wrote: > Hi, > > I just publicly wanted to thank Jason for the work he's done with the KIP > and say that I've been in touch with him privately back and forth to work > out of some of its details. Thanks! > > Since it feels like I initiated this KIP a bit I also want to say that I'= m > happy with it and that its proposal solves the initial issue I reported i= n > https://issues.apache.org/jira/browse/KAFKA-2986. That said, I open for a > [VOTE] on my behalf. I propose Jason decides when voting starts. > > Cheers and keep up the good work, > Jens > > On Tue, Jan 5, 2016 at 8:32 PM, Jason Gustafson > wrote: > > > I've updated the KIP with some implementation details. I also added mor= e > > discussion on the heartbeat() alternative. The short answer for why we > > rejected this API is that it doesn't seem to work well with offset > commits. > > This would tend to make correct usage complicated and difficult to > explain. > > Additionally, we don't see any clear advantages over having a way to se= t > > the max records. For example, using max.records=3D1 would be equivalent= to > > invoking heartbeat() on each iteration of the message processing loop. > > > > Going back to the discussion on whether we should use a configuration > value > > or overload poll(), I'm leaning toward the configuration option mainly > for > > compatibility and to keep the KafkaConsumer API from getting any more > > complex. Also, as others have mentioned, it seems reasonable to want to > > tune this setting in the same place that the session timeout and > heartbeat > > interval are configured. I still feel a little uncomfortable with the > need > > to do a lot of configuration tuning to get the consumer working for a > > particular environment, but hopefully the defaults are conservative > enough > > that most users won't need to. However, if it remains a problem, then w= e > > could still look into better options for managing the size of batches > > including overloading poll() with a max records argument or possibly by > > implementing a batch scaling algorithm internally. > > > > -Jason > > > > > > On Mon, Jan 4, 2016 at 12:18 PM, Jason Gustafson > > wrote: > > > > > Hi Cliff, > > > > > > I think we're all agreed that the current contract of poll() should b= e > > > kept. The consumer wouldn't wait for max messages to become available > in > > > this proposal; it would only sure that it never returns more than max > > > messages. > > > > > > -Jason > > > > > > On Mon, Jan 4, 2016 at 11:52 AM, Cliff Rhyne wrote= : > > > > > >> Instead of a heartbeat, I'd prefer poll() to return whatever message= s > > the > > >> client has. Either a) I don't care if I get less than my max messag= e > > >> limit > > >> or b) I do care and will set a larger timeout. Case B is less commo= n > > than > > >> A and is fairly easy to handle in the application's code. > > >> > > >> On Mon, Jan 4, 2016 at 1:47 PM, Gwen Shapira > wrote: > > >> > > >> > 1. Agree that TCP window style scaling will be cool. I'll try to > think > > >> of a > > >> > good excuse to use it ;) > > >> > > > >> > 2. I'm very concerned about the challenges of getting the timeouts= , > > >> > hearbeats and max messages right. > > >> > > > >> > Another option could be to expose "heartbeat" API to consumers. If > my > > >> app > > >> > is still processing data but is still alive, it could initiate a > > >> heartbeat > > >> > to signal its alive without having to handle additional messages. > > >> > > > >> > I don't know if this improves more than it complicates though :( > > >> > > > >> > On Mon, Jan 4, 2016 at 11:40 AM, Jason Gustafson < > jason@confluent.io> > > >> > wrote: > > >> > > > >> > > Hey Gwen, > > >> > > > > >> > > I was thinking along the lines of TCP window scaling in order to > > >> > > dynamically find a good consumption rate. Basically you'd start > off > > >> > > consuming say 100 records and you'd let it increase until the > > >> consumption > > >> > > took longer than half the session timeout (for example). You > /might/ > > >> be > > >> > > able to achieve the same thing using pause/resume, but it would > be a > > >> lot > > >> > > trickier since you have to do it at the granularity of partition= s. > > But > > >> > > yeah, database write performance doesn't always scale in a > > predictable > > >> > > enough way to accommodate this, so I'm not sure how useful it > would > > >> be in > > >> > > practice. It might also be more difficult to implement since it > > >> wouldn't > > >> > be > > >> > > as clear when to initiate the next fetch. With a static setting, > the > > >> > > consumer knows exactly how many records will be returned on the > next > > >> call > > >> > > to poll() and can send fetches accordingly. > > >> > > > > >> > > On the other hand, I do feel a little wary of the need to tune t= he > > >> > session > > >> > > timeout and max messages though since these settings might depen= d > on > > >> the > > >> > > environment that the consumer is deployed in. It wouldn't be a b= ig > > >> deal > > >> > if > > >> > > the impact was relatively minor, but getting them wrong can caus= e > a > > >> lot > > >> > of > > >> > > rebalance churn which could keep the consumer from making any > > >> progress. > > >> > > It's not a particularly graceful failure. > > >> > > > > >> > > -Jason > > >> > > > > >> > > On Mon, Jan 4, 2016 at 10:49 AM, Gwen Shapira > > >> wrote: > > >> > > > > >> > > > I can't speak to all use-cases, but for the database one, I > think > > >> > > > pause-resume will be necessary in any case, and therefore > dynamic > > >> batch > > >> > > > sizes are not needed. > > >> > > > > > >> > > > Databases are really unexpected regarding response times - loa= d > > and > > >> > > locking > > >> > > > can affect this. I'm not sure there's a good way to know you a= re > > >> going > > >> > > into > > >> > > > rebalance hell before it is too late. So if I were writing cod= e > > that > > >> > > > updates an RDBMS based on Kafka, I'd pick a reasonable batch > size > > >> (say > > >> > > 5000 > > >> > > > records), and basically pause, batch-insert all records, commi= t > > and > > >> > > resume. > > >> > > > > > >> > > > Does that make sense? > > >> > > > > > >> > > > On Mon, Jan 4, 2016 at 10:37 AM, Jason Gustafson < > > >> jason@confluent.io> > > >> > > > wrote: > > >> > > > > > >> > > > > Gwen and Ismael, > > >> > > > > > > >> > > > > I agree the configuration option is probably the way to go, > but > > I > > >> was > > >> > > > > wondering whether there would be cases where it made sense t= o > > let > > >> the > > >> > > > > consumer dynamically set max messages to adjust for downstre= am > > >> > > slowness. > > >> > > > > For example, if the consumer is writing consumed records to > > >> another > > >> > > > > database, and that database is experiencing heavier than > > expected > > >> > load, > > >> > > > > then the consumer could halve its current max messages in > order > > to > > >> > > adapt > > >> > > > > without risking rebalance hell. It could then increase max > > >> messages > > >> > as > > >> > > > the > > >> > > > > load on the database decreases. It's basically an easier way > to > > >> > handle > > >> > > > flow > > >> > > > > control than we provide with pause/resume. > > >> > > > > > > >> > > > > -Jason > > >> > > > > > > >> > > > > On Mon, Jan 4, 2016 at 9:46 AM, Gwen Shapira < > gwen@confluent.io > > > > > >> > > wrote: > > >> > > > > > > >> > > > > > The wiki you pointed to is no longer maintained and fell o= ut > > of > > >> > sync > > >> > > > with > > >> > > > > > the code and protocol. > > >> > > > > > > > >> > > > > > You may want to refer to: > > >> > > > > > > > >> > > > > > > > >> > > > > > > >> > > > > > >> > > > > >> > > > >> > > > https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Pr= otocol > > >> > > > > > > > >> > > > > > On Mon, Jan 4, 2016 at 4:38 AM, Jens Rantil < > > >> jens.rantil@tink.se> > > >> > > > wrote: > > >> > > > > > > > >> > > > > > > Hi guys, > > >> > > > > > > > > >> > > > > > > I realized I never thanked yall for your input - thanks! > > >> > > > > > > Jason: I apologize for assuming your stance on the issue= ! > > >> Feels > > >> > > like > > >> > > > we > > >> > > > > > all > > >> > > > > > > agreed on the solution. +1 > > >> > > > > > > > > >> > > > > > > Follow-up: Jason made a point about defining prefetch an= d > > >> > fairness > > >> > > > > > > behaviour in the KIP. I am now working on putting that > down > > in > > >> > > > writing. > > >> > > > > > To > > >> > > > > > > do be able to do this I think I need to understand the > > current > > >> > > > prefetch > > >> > > > > > > behaviour in the new consumer API (0.9) a bit better. So= me > > >> > specific > > >> > > > > > > questions: > > >> > > > > > > > > >> > > > > > > - How does a specific consumer balance incoming > messages > > >> from > > >> > > > > multiple > > >> > > > > > > partitions? Is the consumer simply issuing Multi-Fetc= h > > >> > > requests[1] > > >> > > > > for > > >> > > > > > > the > > >> > > > > > > consumed assigned partitions of the relevant topics? = Or > > is > > >> the > > >> > > > > > consumer > > >> > > > > > > fetching from one partition at a time and balancing > > between > > >> > them > > >> > > > > > > internally? That is, is the responsibility of partiti= on > > >> > > balancing > > >> > > > > (and > > >> > > > > > > fairness) on the broker side or consumer side? > > >> > > > > > > - Is the above documented somewhere? > > >> > > > > > > > > >> > > > > > > [1] > > >> > > > > > > > > >> > > > > > > > > >> > > > > > > > >> > > > > > > >> > > > > > >> > > > > >> > > > >> > > > https://cwiki.apache.org/confluence/display/KAFKA/Writing+a+Driver+for+Ka= fka > > >> > > > > > > , > > >> > > > > > > see "Multi-Fetch". > > >> > > > > > > > > >> > > > > > > Thanks, > > >> > > > > > > Jens > > >> > > > > > > > > >> > > > > > > On Wed, Dec 23, 2015 at 2:44 AM, Ismael Juma < > > >> ismael@juma.me.uk> > > >> > > > > wrote: > > >> > > > > > > > > >> > > > > > > > On Wed, Dec 23, 2015 at 1:24 AM, Gwen Shapira < > > >> > gwen@confluent.io > > >> > > > > > >> > > > > > wrote: > > >> > > > > > > > > > >> > > > > > > > > Given the background, it sounds like you'll generall= y > > want > > >> > each > > >> > > > > call > > >> > > > > > to > > >> > > > > > > > > poll() to return the same number of events (which is > the > > >> > number > > >> > > > you > > >> > > > > > > > planned > > >> > > > > > > > > on having enough memory / time for). It also sounds > like > > >> > tuning > > >> > > > the > > >> > > > > > > > number > > >> > > > > > > > > of events will be closely tied to tuning the session > > >> timeout. > > >> > > > That > > >> > > > > > is - > > >> > > > > > > > if > > >> > > > > > > > > I choose to lower the session timeout for some > reason, I > > >> will > > >> > > > have > > >> > > > > to > > >> > > > > > > > > modify the number of records returning too. > > >> > > > > > > > > > > >> > > > > > > > > If those assumptions are correct, I think a > > configuration > > >> > makes > > >> > > > > more > > >> > > > > > > > sense. > > >> > > > > > > > > 1. We are unlikely to want this parameter to be chan= ge > > at > > >> the > > >> > > > > > lifetime > > >> > > > > > > of > > >> > > > > > > > > the consumer > > >> > > > > > > > > 2. The correct value is tied to another configuratio= n > > >> > > parameter, > > >> > > > so > > >> > > > > > > they > > >> > > > > > > > > will be controlled together. > > >> > > > > > > > > > > >> > > > > > > > > > >> > > > > > > > I was thinking the same thing. > > >> > > > > > > > > > >> > > > > > > > Ismael > > >> > > > > > > > > > >> > > > > > > > > >> > > > > > > > > >> > > > > > > > > >> > > > > > > -- > > >> > > > > > > Jens Rantil > > >> > > > > > > Backend engineer > > >> > > > > > > Tink AB > > >> > > > > > > > > >> > > > > > > Email: jens.rantil@tink.se > > >> > > > > > > Phone: +46 708 84 18 32 > > >> > > > > > > Web: www.tink.se > > >> > > > > > > > > >> > > > > > > Facebook Linkedin > > >> > > > > > > < > > >> > > > > > > > > >> > > > > > > > >> > > > > > > >> > > > > > >> > > > > >> > > > >> > > > http://www.linkedin.com/company/2735919?trk=3Dvsrp_companies_res_photo&tr= kInfo=3DVSRPsearchId%3A1057023381369207406670%2CVSRPtargetId%3A2735919%2CVS= RPcmpt%3Aprimary > > >> > > > > > > > > > >> > > > > > > Twitter > > >> > > > > > > > > >> > > > > > > > >> > > > > > > >> > > > > > >> > > > > >> > > > >> > > >> > > >> > > >> -- > > >> Cliff Rhyne > > >> Software Engineering Lead > > >> e: crhyne@signal.co > > >> signal.co > > >> ________________________ > > >> > > >> Cut Through the Noise > > >> > > >> This e-mail and any files transmitted with it are for the sole use o= f > > the > > >> intended recipient(s) and may contain confidential and privileged > > >> information. Any unauthorized use of this email is strictly > prohibited. > > >> =C2=A92015 Signal. All rights reserved. > > >> > > > > > > > > > > > > -- > Jens Rantil > Backend engineer > Tink AB > > Email: jens.rantil@tink.se > Phone: +46 708 84 18 32 > Web: www.tink.se > > Facebook Linkedin > < > http://www.linkedin.com/company/2735919?trk=3Dvsrp_companies_res_photo&tr= kInfo=3DVSRPsearchId%3A1057023381369207406670%2CVSRPtargetId%3A2735919%2CVS= RPcmpt%3Aprimary > > > Twitter > --089e01160cd6c7642c0528d61c1b--