Return-Path: X-Original-To: apmail-kafka-dev-archive@www.apache.org Delivered-To: apmail-kafka-dev-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 7B4B418668 for ; Fri, 8 Jan 2016 01:15:22 +0000 (UTC) Received: (qmail 89627 invoked by uid 500); 8 Jan 2016 01:15:22 -0000 Delivered-To: apmail-kafka-dev-archive@kafka.apache.org Received: (qmail 89533 invoked by uid 500); 8 Jan 2016 01:15:22 -0000 Mailing-List: contact dev-help@kafka.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@kafka.apache.org Delivered-To: mailing list dev@kafka.apache.org Received: (qmail 89520 invoked by uid 99); 8 Jan 2016 01:15:21 -0000 Received: from Unknown (HELO spamd1-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 08 Jan 2016 01:15:21 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd1-us-west.apache.org (ASF Mail Server at spamd1-us-west.apache.org) with ESMTP id 4BDF8C1197 for ; Fri, 8 Jan 2016 01:15:21 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd1-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 2.901 X-Spam-Level: ** X-Spam-Status: No, score=2.901 tagged_above=-999 required=6.31 tests=[DKIM_SIGNED=0.1, DKIM_VALID=-0.1, DKIM_VALID_AU=-0.1, HTML_MESSAGE=3, URIBL_BLOCKED=0.001] autolearn=disabled Authentication-Results: spamd1-us-west.apache.org (amavisd-new); dkim=pass (2048-bit key) header.d=gmail.com Received: from mx1-us-west.apache.org ([10.40.0.8]) by localhost (spamd1-us-west.apache.org [10.40.0.7]) (amavisd-new, port 10024) with ESMTP id K8gdr0GRhEI9 for ; Fri, 8 Jan 2016 01:15:09 +0000 (UTC) Received: from mail-oi0-f53.google.com (mail-oi0-f53.google.com [209.85.218.53]) by mx1-us-west.apache.org (ASF Mail Server at mx1-us-west.apache.org) with ESMTPS id A01932319B for ; Fri, 8 Jan 2016 01:15:08 +0000 (UTC) Received: by mail-oi0-f53.google.com with SMTP id o62so320089894oif.3 for ; Thu, 07 Jan 2016 17:15:08 -0800 (PST) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=gmail.com; s=20120113; h=mime-version:in-reply-to:references:date:message-id:subject:from:to :content-type; bh=A0hIfFAuWWigaZMF6LVApY8iGNoFMMM5jZfjcLpdDrg=; b=iIlKUrEju0g2fdO6Srm9w+7MVWQd8LOY8uw4vri4oUnZhcgIILGAHW2LX0qz4vLge1 BttcSoSDz2Wow9DGsGJ88CXiSpuitHZS/eGBCXC0QEeYeoIBzZ+8gtbNFn2V9+VtFvAM tJ5fnrdgdvUGOidvic8TUf13gsyAIOhcvE58xb/X7LsbbWYlhtJvpTec9a3Nim5Pxw+E 5OiZIzL2r++qXmGbgjVBYha0tiXXqtKzM3u8Z5x26Qdze1W4eRMwcvK7Swaxq50PuAvs kGi2uTaDkHm5pBL0DEupfImWJ5wTD9FLwQD4SrJqLgW5H+L+DxYhkyrTmhjsOSl7ewfE SAWw== MIME-Version: 1.0 X-Received: by 10.202.168.197 with SMTP id r188mr75183768oie.44.1452215707941; Thu, 07 Jan 2016 17:15:07 -0800 (PST) Received: by 10.202.204.80 with HTTP; Thu, 7 Jan 2016 17:15:07 -0800 (PST) In-Reply-To: References: Date: Thu, 7 Jan 2016 17:15:07 -0800 Message-ID: Subject: Re: KIP-41: KafkaConsumer Max Records From: Aarti Gupta To: dev@kafka.apache.org Content-Type: multipart/alternative; boundary=001a113cbd4468c1080528c851d8 --001a113cbd4468c1080528c851d8 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: quoted-printable Hi Json, I am concerned about how many records can be prefetched into consumer memory. Currently we control the maximum number of bytes per topic and partition by setting fetch.message.max.bytes The max.partition.fetch.bytes =3D #no of partitions * fetch.message.max.byt= es However, partitions can be added dynamically, which would mean that a single process (for example a single JVM with multiple consumers), that consumes messages from large number of partitions may not able to keep all the pre fetched messages in memory. Additionally, if the relative size of messages is highly variable, it would be hard to correlate the max size in bytes for message fetch with the number of records returned on a poll. We previously observed (in a production setup), that, if the size of the message is greater than fetch.message.max.bytes, the consumer gets stuck. This encouraged us to increase the fetch.message.max.bytes to a significantly large value. This would worsen the memory consumption fear described above,( when the number of partitions is also large.) While there may not be a single magic formula to predict the correct combination of fetch.message.max.bytes and #*max.poll.records, **maybe we can make the prefetch algorithm a mathematical function of the f*etch.message.max.bytes and #noofpartitions? thoughts? Thanks aarti additional unimportant note: the link to the JIRA in the KIP is broken On Thu, Jan 7, 2016 at 2:37 PM, Guozhang Wang wrote: > Thanks Jason. I think it is a good feature to add, +1. > > As suggested in KIP-32, we'd better to keep end state of the KIP wiki wit= h > finalized implementation details rather than leaving a list of options. I > agree that for both fairness and pre-fetching the simpler approach would = be > sufficient for most of the time. So could we move the other approach to > "rejected"? > > Guozhang > > On Wed, Jan 6, 2016 at 6:14 PM, Gwen Shapira wrote: > > > I like the fair-consumption approach you chose - "pull as many records = as > > possible from each partition in a similar round-robin fashion", it is > very > > intuitive and close enough to fair. > > > > Overall, I'm +1 on the KIP. But you'll need a formal vote :) > > > > On Wed, Jan 6, 2016 at 6:05 PM, Jason Gustafson > > wrote: > > > > > Thanks for the suggestion, Ismael. I updated the KIP. > > > > > > -Jason > > > > > > On Wed, Jan 6, 2016 at 6:57 AM, Ismael Juma wrote= : > > > > > > > Thanks Jason. I read the KIP and it makes sense to me. A minor > > > suggestion: > > > > in the "Ensuring Fair Consumption" section, there are 3 paragraphs > > with 2 > > > > examples (2 partitions with 100 max.poll.records and 3 partitions > with > > 30 > > > > max.poll.records). I think you could simplify this by using one of > the > > > > examples in the 3 paragraphs. > > > > > > > > Ismael > > > > > > > > On Tue, Jan 5, 2016 at 7:32 PM, Jason Gustafson > > > > wrote: > > > > > > > > > I've updated the KIP with some implementation details. I also add= ed > > > more > > > > > discussion on the heartbeat() alternative. The short answer for w= hy > > we > > > > > rejected this API is that it doesn't seem to work well with offse= t > > > > commits. > > > > > This would tend to make correct usage complicated and difficult t= o > > > > explain. > > > > > Additionally, we don't see any clear advantages over having a way > to > > > set > > > > > the max records. For example, using max.records=3D1 would be > equivalent > > > to > > > > > invoking heartbeat() on each iteration of the message processing > > loop. > > > > > > > > > > Going back to the discussion on whether we should use a > configuration > > > > value > > > > > or overload poll(), I'm leaning toward the configuration option > > mainly > > > > for > > > > > compatibility and to keep the KafkaConsumer API from getting any > more > > > > > complex. Also, as others have mentioned, it seems reasonable to > want > > to > > > > > tune this setting in the same place that the session timeout and > > > > heartbeat > > > > > interval are configured. I still feel a little uncomfortable with > the > > > > need > > > > > to do a lot of configuration tuning to get the consumer working > for a > > > > > particular environment, but hopefully the defaults are conservati= ve > > > > enough > > > > > that most users won't need to. However, if it remains a problem, > then > > > we > > > > > could still look into better options for managing the size of > batches > > > > > including overloading poll() with a max records argument or > possibly > > by > > > > > implementing a batch scaling algorithm internally. > > > > > > > > > > -Jason > > > > > > > > > > > > > > > On Mon, Jan 4, 2016 at 12:18 PM, Jason Gustafson < > jason@confluent.io > > > > > > > > wrote: > > > > > > > > > > > Hi Cliff, > > > > > > > > > > > > I think we're all agreed that the current contract of poll() > should > > > be > > > > > > kept. The consumer wouldn't wait for max messages to become > > available > > > > in > > > > > > this proposal; it would only sure that it never returns more th= an > > max > > > > > > messages. > > > > > > > > > > > > -Jason > > > > > > > > > > > > On Mon, Jan 4, 2016 at 11:52 AM, Cliff Rhyne > > > wrote: > > > > > > > > > > > >> Instead of a heartbeat, I'd prefer poll() to return whatever > > > messages > > > > > the > > > > > >> client has. Either a) I don't care if I get less than my max > > > message > > > > > >> limit > > > > > >> or b) I do care and will set a larger timeout. Case B is less > > > common > > > > > than > > > > > >> A and is fairly easy to handle in the application's code. > > > > > >> > > > > > >> On Mon, Jan 4, 2016 at 1:47 PM, Gwen Shapira > > > > > wrote: > > > > > >> > > > > > >> > 1. Agree that TCP window style scaling will be cool. I'll tr= y > to > > > > think > > > > > >> of a > > > > > >> > good excuse to use it ;) > > > > > >> > > > > > > >> > 2. I'm very concerned about the challenges of getting the > > > timeouts, > > > > > >> > hearbeats and max messages right. > > > > > >> > > > > > > >> > Another option could be to expose "heartbeat" API to > consumers. > > If > > > > my > > > > > >> app > > > > > >> > is still processing data but is still alive, it could > initiate a > > > > > >> heartbeat > > > > > >> > to signal its alive without having to handle additional > > messages. > > > > > >> > > > > > > >> > I don't know if this improves more than it complicates thoug= h > :( > > > > > >> > > > > > > >> > On Mon, Jan 4, 2016 at 11:40 AM, Jason Gustafson < > > > > jason@confluent.io> > > > > > >> > wrote: > > > > > >> > > > > > > >> > > Hey Gwen, > > > > > >> > > > > > > > >> > > I was thinking along the lines of TCP window scaling in > order > > to > > > > > >> > > dynamically find a good consumption rate. Basically you'd > > start > > > > off > > > > > >> > > consuming say 100 records and you'd let it increase until > the > > > > > >> consumption > > > > > >> > > took longer than half the session timeout (for example). Y= ou > > > > /might/ > > > > > >> be > > > > > >> > > able to achieve the same thing using pause/resume, but it > > would > > > > be a > > > > > >> lot > > > > > >> > > trickier since you have to do it at the granularity of > > > partitions. > > > > > But > > > > > >> > > yeah, database write performance doesn't always scale in a > > > > > predictable > > > > > >> > > enough way to accommodate this, so I'm not sure how useful > it > > > > would > > > > > >> be in > > > > > >> > > practice. It might also be more difficult to implement sin= ce > > it > > > > > >> wouldn't > > > > > >> > be > > > > > >> > > as clear when to initiate the next fetch. With a static > > setting, > > > > the > > > > > >> > > consumer knows exactly how many records will be returned o= n > > the > > > > next > > > > > >> call > > > > > >> > > to poll() and can send fetches accordingly. > > > > > >> > > > > > > > >> > > On the other hand, I do feel a little wary of the need to > tune > > > the > > > > > >> > session > > > > > >> > > timeout and max messages though since these settings might > > > depend > > > > on > > > > > >> the > > > > > >> > > environment that the consumer is deployed in. It wouldn't > be a > > > big > > > > > >> deal > > > > > >> > if > > > > > >> > > the impact was relatively minor, but getting them wrong ca= n > > > cause > > > > a > > > > > >> lot > > > > > >> > of > > > > > >> > > rebalance churn which could keep the consumer from making > any > > > > > >> progress. > > > > > >> > > It's not a particularly graceful failure. > > > > > >> > > > > > > > >> > > -Jason > > > > > >> > > > > > > > >> > > On Mon, Jan 4, 2016 at 10:49 AM, Gwen Shapira < > > > gwen@confluent.io> > > > > > >> wrote: > > > > > >> > > > > > > > >> > > > I can't speak to all use-cases, but for the database one= , > I > > > > think > > > > > >> > > > pause-resume will be necessary in any case, and therefor= e > > > > dynamic > > > > > >> batch > > > > > >> > > > sizes are not needed. > > > > > >> > > > > > > > > >> > > > Databases are really unexpected regarding response times= - > > > load > > > > > and > > > > > >> > > locking > > > > > >> > > > can affect this. I'm not sure there's a good way to know > you > > > are > > > > > >> going > > > > > >> > > into > > > > > >> > > > rebalance hell before it is too late. So if I were writi= ng > > > code > > > > > that > > > > > >> > > > updates an RDBMS based on Kafka, I'd pick a reasonable > batch > > > > size > > > > > >> (say > > > > > >> > > 5000 > > > > > >> > > > records), and basically pause, batch-insert all records, > > > commit > > > > > and > > > > > >> > > resume. > > > > > >> > > > > > > > > >> > > > Does that make sense? > > > > > >> > > > > > > > > >> > > > On Mon, Jan 4, 2016 at 10:37 AM, Jason Gustafson < > > > > > >> jason@confluent.io> > > > > > >> > > > wrote: > > > > > >> > > > > > > > > >> > > > > Gwen and Ismael, > > > > > >> > > > > > > > > > >> > > > > I agree the configuration option is probably the way t= o > > go, > > > > but > > > > > I > > > > > >> was > > > > > >> > > > > wondering whether there would be cases where it made > sense > > > to > > > > > let > > > > > >> the > > > > > >> > > > > consumer dynamically set max messages to adjust for > > > downstream > > > > > >> > > slowness. > > > > > >> > > > > For example, if the consumer is writing consumed recor= ds > > to > > > > > >> another > > > > > >> > > > > database, and that database is experiencing heavier th= an > > > > > expected > > > > > >> > load, > > > > > >> > > > > then the consumer could halve its current max messages > in > > > > order > > > > > to > > > > > >> > > adapt > > > > > >> > > > > without risking rebalance hell. It could then increase > max > > > > > >> messages > > > > > >> > as > > > > > >> > > > the > > > > > >> > > > > load on the database decreases. It's basically an easi= er > > way > > > > to > > > > > >> > handle > > > > > >> > > > flow > > > > > >> > > > > control than we provide with pause/resume. > > > > > >> > > > > > > > > > >> > > > > -Jason > > > > > >> > > > > > > > > > >> > > > > On Mon, Jan 4, 2016 at 9:46 AM, Gwen Shapira < > > > > gwen@confluent.io > > > > > > > > > > > >> > > wrote: > > > > > >> > > > > > > > > > >> > > > > > The wiki you pointed to is no longer maintained and > fell > > > out > > > > > of > > > > > >> > sync > > > > > >> > > > with > > > > > >> > > > > > the code and protocol. > > > > > >> > > > > > > > > > > >> > > > > > You may want to refer to: > > > > > >> > > > > > > > > > > >> > > > > > > > > > > >> > > > > > > > > > >> > > > > > > > > >> > > > > > > > >> > > > > > > >> > > > > > > > > > > > > > > > https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Pr= otocol > > > > > >> > > > > > > > > > > >> > > > > > On Mon, Jan 4, 2016 at 4:38 AM, Jens Rantil < > > > > > >> jens.rantil@tink.se> > > > > > >> > > > wrote: > > > > > >> > > > > > > > > > > >> > > > > > > Hi guys, > > > > > >> > > > > > > > > > > > >> > > > > > > I realized I never thanked yall for your input - > > thanks! > > > > > >> > > > > > > Jason: I apologize for assuming your stance on the > > > issue! > > > > > >> Feels > > > > > >> > > like > > > > > >> > > > we > > > > > >> > > > > > all > > > > > >> > > > > > > agreed on the solution. +1 > > > > > >> > > > > > > > > > > > >> > > > > > > Follow-up: Jason made a point about defining > prefetch > > > and > > > > > >> > fairness > > > > > >> > > > > > > behaviour in the KIP. I am now working on putting > that > > > > down > > > > > in > > > > > >> > > > writing. > > > > > >> > > > > > To > > > > > >> > > > > > > do be able to do this I think I need to understand > the > > > > > current > > > > > >> > > > prefetch > > > > > >> > > > > > > behaviour in the new consumer API (0.9) a bit > better. > > > Some > > > > > >> > specific > > > > > >> > > > > > > questions: > > > > > >> > > > > > > > > > > > >> > > > > > > - How does a specific consumer balance incoming > > > > messages > > > > > >> from > > > > > >> > > > > multiple > > > > > >> > > > > > > partitions? Is the consumer simply issuing > > > Multi-Fetch > > > > > >> > > requests[1] > > > > > >> > > > > for > > > > > >> > > > > > > the > > > > > >> > > > > > > consumed assigned partitions of the relevant > > topics? > > > Or > > > > > is > > > > > >> the > > > > > >> > > > > > consumer > > > > > >> > > > > > > fetching from one partition at a time and > balancing > > > > > between > > > > > >> > them > > > > > >> > > > > > > internally? That is, is the responsibility of > > > partition > > > > > >> > > balancing > > > > > >> > > > > (and > > > > > >> > > > > > > fairness) on the broker side or consumer side? > > > > > >> > > > > > > - Is the above documented somewhere? > > > > > >> > > > > > > > > > > > >> > > > > > > [1] > > > > > >> > > > > > > > > > > > >> > > > > > > > > > > > >> > > > > > > > > > > >> > > > > > > > > > >> > > > > > > > > >> > > > > > > > >> > > > > > > >> > > > > > > > > > > > > > > > https://cwiki.apache.org/confluence/display/KAFKA/Writing+a+Driver+for+Ka= fka > > > > > >> > > > > > > , > > > > > >> > > > > > > see "Multi-Fetch". > > > > > >> > > > > > > > > > > > >> > > > > > > Thanks, > > > > > >> > > > > > > Jens > > > > > >> > > > > > > > > > > > >> > > > > > > On Wed, Dec 23, 2015 at 2:44 AM, Ismael Juma < > > > > > >> ismael@juma.me.uk> > > > > > >> > > > > wrote: > > > > > >> > > > > > > > > > > > >> > > > > > > > On Wed, Dec 23, 2015 at 1:24 AM, Gwen Shapira < > > > > > >> > gwen@confluent.io > > > > > >> > > > > > > > > >> > > > > > wrote: > > > > > >> > > > > > > > > > > > > >> > > > > > > > > Given the background, it sounds like you'll > > > generally > > > > > want > > > > > >> > each > > > > > >> > > > > call > > > > > >> > > > > > to > > > > > >> > > > > > > > > poll() to return the same number of events > (which > > is > > > > the > > > > > >> > number > > > > > >> > > > you > > > > > >> > > > > > > > planned > > > > > >> > > > > > > > > on having enough memory / time for). It also > > sounds > > > > like > > > > > >> > tuning > > > > > >> > > > the > > > > > >> > > > > > > > number > > > > > >> > > > > > > > > of events will be closely tied to tuning the > > session > > > > > >> timeout. > > > > > >> > > > That > > > > > >> > > > > > is - > > > > > >> > > > > > > > if > > > > > >> > > > > > > > > I choose to lower the session timeout for some > > > > reason, I > > > > > >> will > > > > > >> > > > have > > > > > >> > > > > to > > > > > >> > > > > > > > > modify the number of records returning too. > > > > > >> > > > > > > > > > > > > > >> > > > > > > > > If those assumptions are correct, I think a > > > > > configuration > > > > > >> > makes > > > > > >> > > > > more > > > > > >> > > > > > > > sense. > > > > > >> > > > > > > > > 1. We are unlikely to want this parameter to b= e > > > change > > > > > at > > > > > >> the > > > > > >> > > > > > lifetime > > > > > >> > > > > > > of > > > > > >> > > > > > > > > the consumer > > > > > >> > > > > > > > > 2. The correct value is tied to another > > > configuration > > > > > >> > > parameter, > > > > > >> > > > so > > > > > >> > > > > > > they > > > > > >> > > > > > > > > will be controlled together. > > > > > >> > > > > > > > > > > > > > >> > > > > > > > > > > > > >> > > > > > > > I was thinking the same thing. > > > > > >> > > > > > > > > > > > > >> > > > > > > > Ismael > > > > > >> > > > > > > > > > > > > >> > > > > > > > > > > > >> > > > > > > > > > > > >> > > > > > > > > > > > >> > > > > > > -- > > > > > >> > > > > > > Jens Rantil > > > > > >> > > > > > > Backend engineer > > > > > >> > > > > > > Tink AB > > > > > >> > > > > > > > > > > > >> > > > > > > Email: jens.rantil@tink.se > > > > > >> > > > > > > Phone: +46 708 84 18 32 > > > > > >> > > > > > > Web: www.tink.se > > > > > >> > > > > > > > > > > > >> > > > > > > Facebook > > Linkedin > > > > > >> > > > > > > < > > > > > >> > > > > > > > > > > > >> > > > > > > > > > > >> > > > > > > > > > >> > > > > > > > > >> > > > > > > > >> > > > > > > >> > > > > > > > > > > > > > > > http://www.linkedin.com/company/2735919?trk=3Dvsrp_companies_res_photo&tr= kInfo=3DVSRPsearchId%3A1057023381369207406670%2CVSRPtargetId%3A2735919%2CVS= RPcmpt%3Aprimary > > > > > >> > > > > > > > > > > > > >> > > > > > > Twitter > > > > > >> > > > > > > > > > > > >> > > > > > > > > > > >> > > > > > > > > > >> > > > > > > > > >> > > > > > > > >> > > > > > > >> > > > > > >> > > > > > >> > > > > > >> -- > > > > > >> Cliff Rhyne > > > > > >> Software Engineering Lead > > > > > >> e: crhyne@signal.co > > > > > >> signal.co > > > > > >> ________________________ > > > > > >> > > > > > >> Cut Through the Noise > > > > > >> > > > > > >> This e-mail and any files transmitted with it are for the sole > use > > > of > > > > > the > > > > > >> intended recipient(s) and may contain confidential and > privileged > > > > > >> information. Any unauthorized use of this email is strictly > > > > prohibited. > > > > > >> =C2=A92015 Signal. All rights reserved. > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > -- > -- Guozhang > --001a113cbd4468c1080528c851d8--