Return-Path: X-Original-To: apmail-kafka-dev-archive@www.apache.org Delivered-To: apmail-kafka-dev-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id BA58518411 for ; Fri, 8 Jan 2016 19:51:17 +0000 (UTC) Received: (qmail 89818 invoked by uid 500); 8 Jan 2016 19:51:17 -0000 Delivered-To: apmail-kafka-dev-archive@kafka.apache.org Received: (qmail 89722 invoked by uid 500); 8 Jan 2016 19:51:17 -0000 Mailing-List: contact dev-help@kafka.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@kafka.apache.org Delivered-To: mailing list dev@kafka.apache.org Received: (qmail 89702 invoked by uid 99); 8 Jan 2016 19:51:16 -0000 Received: from Unknown (HELO spamd4-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 08 Jan 2016 19:51:16 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd4-us-west.apache.org (ASF Mail Server at spamd4-us-west.apache.org) with ESMTP id 63CE5C0DEB for ; Fri, 8 Jan 2016 19:51:16 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd4-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 3.001 X-Spam-Level: *** X-Spam-Status: No, score=3.001 tagged_above=-999 required=6.31 tests=[DKIM_SIGNED=0.1, DKIM_VALID=-0.1, HTML_MESSAGE=3, URIBL_BLOCKED=0.001] autolearn=disabled Authentication-Results: spamd4-us-west.apache.org (amavisd-new); dkim=pass (2048-bit key) header.d=confluent-io.20150623.gappssmtp.com Received: from mx1-us-west.apache.org ([10.40.0.8]) by localhost (spamd4-us-west.apache.org [10.40.0.11]) (amavisd-new, port 10024) with ESMTP id JKNHWEXQxIej for ; Fri, 8 Jan 2016 19:51:02 +0000 (UTC) Received: from mail-lf0-f54.google.com (mail-lf0-f54.google.com [209.85.215.54]) by mx1-us-west.apache.org (ASF Mail Server at mx1-us-west.apache.org) with ESMTPS id E576B206A7 for ; Fri, 8 Jan 2016 19:51:01 +0000 (UTC) Received: by mail-lf0-f54.google.com with SMTP id i124so13798895lfe.3 for ; Fri, 08 Jan 2016 11:51:01 -0800 (PST) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=confluent-io.20150623.gappssmtp.com; s=20150623; h=mime-version:in-reply-to:references:date:message-id:subject:from:to :content-type; bh=CIcXeOpTZbcfR/tFHrLI1j3FdNvMOzk2/PdLfpxdoi4=; b=FSdxcCFktWDJshro72RxJNzJbaOAITBJ6Tt1fHM/9G1A3RFyC+iWMBYNbsMfqhaTUG xhtGPpF20ERgTqeZhnkpc+erexB+zAb7Gvq2Z13S9PlfXv+qPjiBxTFVI5FrFsFIC7ci YpejS4+Wrt+FdudgvKUOtQvENGVetIEKBea9zMbdeIhxf3lDymLyAd7aHj/mE4HIwJUL XwlfMP8JDuaLXWf9ykm334Cd04WcSJgWo3chpFo9pOucs3X4H2MmAfB8jZ0KmgoTqFGB ZzTmgJrynAUBR9z5zmHyDjgGkJylFxKnqbug6oZu4VFzPm/SQTz6Rcm7xvpKa30UDVeM 2UXA== X-Google-DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=1e100.net; s=20130820; h=x-gm-message-state:mime-version:in-reply-to:references:date :message-id:subject:from:to:content-type; bh=CIcXeOpTZbcfR/tFHrLI1j3FdNvMOzk2/PdLfpxdoi4=; b=DTcszLJFcyu3NdCU29qoCXRAiy3hG9Ln92AeDYaHZjNFGbktCFGbewpUVoDzk+YpHQ nKfLfAZ/mM/G5eD3c8Idy1BRrnhvcQOMfXQEnCD+XY0t15JV8eCw99aE6Nu7nbAjJZUO bgcYrAlKoQHdQeGIF9GxIqZffsRD7+g+l9FghHe2wtXMmfhj3UlDSBifsDGfD8znLuej XKbDbLWglYjqVYBrODgDDogPoPv0m89GvQGFAnrNeJqbNWNGyBx8sLg+PkpyyyfkHCCi O7uxvsD8ItrX1wXl+MUr7hkvMefWSBAKYq9fNpcaJuaFoV7gzsKs/27GKiKcY8LT3Q7Q 6wYQ== X-Gm-Message-State: ALoCoQmT6Qu/0+oXjgZ4ZqM78csfeKAIndtjgRbMOQmXJovZ++nlsbmheDtjd7Ki6wyREHeJQP9bQX6fKySi/x6GgE0kA2w1KQ== MIME-Version: 1.0 X-Received: by 10.25.161.6 with SMTP id k6mr30562648lfe.17.1452282654038; Fri, 08 Jan 2016 11:50:54 -0800 (PST) Received: by 10.112.16.227 with HTTP; Fri, 8 Jan 2016 11:50:53 -0800 (PST) In-Reply-To: References: Date: Fri, 8 Jan 2016 11:50:53 -0800 Message-ID: Subject: Re: KIP-41: KafkaConsumer Max Records From: Jason Gustafson To: dev@kafka.apache.org Content-Type: multipart/alternative; boundary=001a114106fab5525e0528d7e7b6 --001a114106fab5525e0528d7e7b6 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: quoted-printable Thanks Jens for all of your work as well! Unless there are any more concerns, perhaps we can open the vote early next week. As a quick summary for newcomers to this thread, the problem we're trying to solve in this KIP is how to give users more predictable control over the message processing loop. Because the new consumer is single-threaded, the poll() API must be called frequently enough to ensure that the consumer can send heartbeats before its session timeout expires. Typically we recommend setting the session timeout large enough to make expiration unlikely, but that can be difficult advice to follow in practice when either the number of partitions is unknown or increases over time. In some cases, such as in Jens' initial bug report, the processing time does not even depend directly on the size of the total data to be processed. To address this problem, we have proposed to offer a new configuration option "max.poll.records" which sets an upper bound on the number of records returned in a single call to poll(). The point is to give users a way to limit message processing time so that the session timeout can be set without risking unexpected rebalances. This change is backward compatible with the current API and users only need to change their configuration to take advantage of it. As a bonus, it provides an easy mechanism to implement commit policies which ensure commits at least as often as every N records. As a final subject for consideration, it may make sense to also add a configuration "max.poll.bytes," which places an upper bound on the total size of the data returned in a call to poll(). This would solve the problem more generally since some use cases may actually have processing time which is more dependent on the total size of the data than the number of records. Others might require a mix of the two. -Jason On Fri, Jan 8, 2016 at 9:42 AM, Jason Gustafson wrote: > Hi Aarti, > > Thanks for the feedback. I think the concern about memory overhead is > valid. As Guozhang mentioned, the problem already exists in the current > consumer, so this probably deserves consideration outside of this KIP. Th= at > said, it's a good question whether our prefetching strategy makes it more > difficult to control the memory overhead. The approach we've proposed for > prefetching is basically the following: fetch all partitions whenever the > number of retained messages is less than max.poll.records. In the worst > case, this increases the maximum memory used by the consumer by the size = of > those retained messages. As you've pointed out, messages could be very > large. We could reduce this requirement with a slight change: instead of > fetching all partitions, we could fetch only those with no retained data. > That would reduce the worst-case overhead to #no partitions * > max.partition.fetch.bytes, which matches the existing memory overhead. > Would that address your concern? > > A couple other points worth mentioning is that users have the option not > to use max.poll.records, in which case the behavior will be the same as i= n > the current consumer. Additionally, the implementation can be changed ove= r > time without affecting users, so we can adjust it in particular when we > address memory concerns in KAFKA-2045. > > On a side note, I'm wondering if it would be useful to extend this KIP to > include a max.poll.bytes? For some use cases, it may make more sense to > control the processing time by the size of data instead of the number of > records. Not that I'm in anxious to draw this out, but if we'll need this > setting eventually, we may as well do it now. Thoughts? > > > -Jason > > On Fri, Jan 8, 2016 at 1:03 AM, Jens Rantil wrote: > >> Hi, >> >> I just publicly wanted to thank Jason for the work he's done with the KI= P >> and say that I've been in touch with him privately back and forth to wor= k >> out of some of its details. Thanks! >> >> Since it feels like I initiated this KIP a bit I also want to say that I= 'm >> happy with it and that its proposal solves the initial issue I reported = in >> https://issues.apache.org/jira/browse/KAFKA-2986. That said, I open for = a >> [VOTE] on my behalf. I propose Jason decides when voting starts. >> >> Cheers and keep up the good work, >> Jens >> >> On Tue, Jan 5, 2016 at 8:32 PM, Jason Gustafson >> wrote: >> >> > I've updated the KIP with some implementation details. I also added mo= re >> > discussion on the heartbeat() alternative. The short answer for why we >> > rejected this API is that it doesn't seem to work well with offset >> commits. >> > This would tend to make correct usage complicated and difficult to >> explain. >> > Additionally, we don't see any clear advantages over having a way to s= et >> > the max records. For example, using max.records=3D1 would be equivalen= t to >> > invoking heartbeat() on each iteration of the message processing loop. >> > >> > Going back to the discussion on whether we should use a configuration >> value >> > or overload poll(), I'm leaning toward the configuration option mainly >> for >> > compatibility and to keep the KafkaConsumer API from getting any more >> > complex. Also, as others have mentioned, it seems reasonable to want t= o >> > tune this setting in the same place that the session timeout and >> heartbeat >> > interval are configured. I still feel a little uncomfortable with the >> need >> > to do a lot of configuration tuning to get the consumer working for a >> > particular environment, but hopefully the defaults are conservative >> enough >> > that most users won't need to. However, if it remains a problem, then = we >> > could still look into better options for managing the size of batches >> > including overloading poll() with a max records argument or possibly b= y >> > implementing a batch scaling algorithm internally. >> > >> > -Jason >> > >> > >> > On Mon, Jan 4, 2016 at 12:18 PM, Jason Gustafson >> > wrote: >> > >> > > Hi Cliff, >> > > >> > > I think we're all agreed that the current contract of poll() should = be >> > > kept. The consumer wouldn't wait for max messages to become availabl= e >> in >> > > this proposal; it would only sure that it never returns more than ma= x >> > > messages. >> > > >> > > -Jason >> > > >> > > On Mon, Jan 4, 2016 at 11:52 AM, Cliff Rhyne >> wrote: >> > > >> > >> Instead of a heartbeat, I'd prefer poll() to return whatever messag= es >> > the >> > >> client has. Either a) I don't care if I get less than my max messa= ge >> > >> limit >> > >> or b) I do care and will set a larger timeout. Case B is less comm= on >> > than >> > >> A and is fairly easy to handle in the application's code. >> > >> >> > >> On Mon, Jan 4, 2016 at 1:47 PM, Gwen Shapira >> wrote: >> > >> >> > >> > 1. Agree that TCP window style scaling will be cool. I'll try to >> think >> > >> of a >> > >> > good excuse to use it ;) >> > >> > >> > >> > 2. I'm very concerned about the challenges of getting the timeout= s, >> > >> > hearbeats and max messages right. >> > >> > >> > >> > Another option could be to expose "heartbeat" API to consumers. I= f >> my >> > >> app >> > >> > is still processing data but is still alive, it could initiate a >> > >> heartbeat >> > >> > to signal its alive without having to handle additional messages. >> > >> > >> > >> > I don't know if this improves more than it complicates though :( >> > >> > >> > >> > On Mon, Jan 4, 2016 at 11:40 AM, Jason Gustafson < >> jason@confluent.io> >> > >> > wrote: >> > >> > >> > >> > > Hey Gwen, >> > >> > > >> > >> > > I was thinking along the lines of TCP window scaling in order t= o >> > >> > > dynamically find a good consumption rate. Basically you'd start >> off >> > >> > > consuming say 100 records and you'd let it increase until the >> > >> consumption >> > >> > > took longer than half the session timeout (for example). You >> /might/ >> > >> be >> > >> > > able to achieve the same thing using pause/resume, but it would >> be a >> > >> lot >> > >> > > trickier since you have to do it at the granularity of >> partitions. >> > But >> > >> > > yeah, database write performance doesn't always scale in a >> > predictable >> > >> > > enough way to accommodate this, so I'm not sure how useful it >> would >> > >> be in >> > >> > > practice. It might also be more difficult to implement since it >> > >> wouldn't >> > >> > be >> > >> > > as clear when to initiate the next fetch. With a static setting= , >> the >> > >> > > consumer knows exactly how many records will be returned on the >> next >> > >> call >> > >> > > to poll() and can send fetches accordingly. >> > >> > > >> > >> > > On the other hand, I do feel a little wary of the need to tune >> the >> > >> > session >> > >> > > timeout and max messages though since these settings might >> depend on >> > >> the >> > >> > > environment that the consumer is deployed in. It wouldn't be a >> big >> > >> deal >> > >> > if >> > >> > > the impact was relatively minor, but getting them wrong can >> cause a >> > >> lot >> > >> > of >> > >> > > rebalance churn which could keep the consumer from making any >> > >> progress. >> > >> > > It's not a particularly graceful failure. >> > >> > > >> > >> > > -Jason >> > >> > > >> > >> > > On Mon, Jan 4, 2016 at 10:49 AM, Gwen Shapira > > >> > >> wrote: >> > >> > > >> > >> > > > I can't speak to all use-cases, but for the database one, I >> think >> > >> > > > pause-resume will be necessary in any case, and therefore >> dynamic >> > >> batch >> > >> > > > sizes are not needed. >> > >> > > > >> > >> > > > Databases are really unexpected regarding response times - lo= ad >> > and >> > >> > > locking >> > >> > > > can affect this. I'm not sure there's a good way to know you >> are >> > >> going >> > >> > > into >> > >> > > > rebalance hell before it is too late. So if I were writing co= de >> > that >> > >> > > > updates an RDBMS based on Kafka, I'd pick a reasonable batch >> size >> > >> (say >> > >> > > 5000 >> > >> > > > records), and basically pause, batch-insert all records, comm= it >> > and >> > >> > > resume. >> > >> > > > >> > >> > > > Does that make sense? >> > >> > > > >> > >> > > > On Mon, Jan 4, 2016 at 10:37 AM, Jason Gustafson < >> > >> jason@confluent.io> >> > >> > > > wrote: >> > >> > > > >> > >> > > > > Gwen and Ismael, >> > >> > > > > >> > >> > > > > I agree the configuration option is probably the way to go, >> but >> > I >> > >> was >> > >> > > > > wondering whether there would be cases where it made sense = to >> > let >> > >> the >> > >> > > > > consumer dynamically set max messages to adjust for >> downstream >> > >> > > slowness. >> > >> > > > > For example, if the consumer is writing consumed records to >> > >> another >> > >> > > > > database, and that database is experiencing heavier than >> > expected >> > >> > load, >> > >> > > > > then the consumer could halve its current max messages in >> order >> > to >> > >> > > adapt >> > >> > > > > without risking rebalance hell. It could then increase max >> > >> messages >> > >> > as >> > >> > > > the >> > >> > > > > load on the database decreases. It's basically an easier wa= y >> to >> > >> > handle >> > >> > > > flow >> > >> > > > > control than we provide with pause/resume. >> > >> > > > > >> > >> > > > > -Jason >> > >> > > > > >> > >> > > > > On Mon, Jan 4, 2016 at 9:46 AM, Gwen Shapira < >> gwen@confluent.io >> > > >> > >> > > wrote: >> > >> > > > > >> > >> > > > > > The wiki you pointed to is no longer maintained and fell >> out >> > of >> > >> > sync >> > >> > > > with >> > >> > > > > > the code and protocol. >> > >> > > > > > >> > >> > > > > > You may want to refer to: >> > >> > > > > > >> > >> > > > > > >> > >> > > > > >> > >> > > > >> > >> > > >> > >> > >> > >> >> > >> https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+P= rotocol >> > >> > > > > > >> > >> > > > > > On Mon, Jan 4, 2016 at 4:38 AM, Jens Rantil < >> > >> jens.rantil@tink.se> >> > >> > > > wrote: >> > >> > > > > > >> > >> > > > > > > Hi guys, >> > >> > > > > > > >> > >> > > > > > > I realized I never thanked yall for your input - thanks= ! >> > >> > > > > > > Jason: I apologize for assuming your stance on the issu= e! >> > >> Feels >> > >> > > like >> > >> > > > we >> > >> > > > > > all >> > >> > > > > > > agreed on the solution. +1 >> > >> > > > > > > >> > >> > > > > > > Follow-up: Jason made a point about defining prefetch a= nd >> > >> > fairness >> > >> > > > > > > behaviour in the KIP. I am now working on putting that >> down >> > in >> > >> > > > writing. >> > >> > > > > > To >> > >> > > > > > > do be able to do this I think I need to understand the >> > current >> > >> > > > prefetch >> > >> > > > > > > behaviour in the new consumer API (0.9) a bit better. >> Some >> > >> > specific >> > >> > > > > > > questions: >> > >> > > > > > > >> > >> > > > > > > - How does a specific consumer balance incoming >> messages >> > >> from >> > >> > > > > multiple >> > >> > > > > > > partitions? Is the consumer simply issuing Multi-Fet= ch >> > >> > > requests[1] >> > >> > > > > for >> > >> > > > > > > the >> > >> > > > > > > consumed assigned partitions of the relevant topics? >> Or >> > is >> > >> the >> > >> > > > > > consumer >> > >> > > > > > > fetching from one partition at a time and balancing >> > between >> > >> > them >> > >> > > > > > > internally? That is, is the responsibility of >> partition >> > >> > > balancing >> > >> > > > > (and >> > >> > > > > > > fairness) on the broker side or consumer side? >> > >> > > > > > > - Is the above documented somewhere? >> > >> > > > > > > >> > >> > > > > > > [1] >> > >> > > > > > > >> > >> > > > > > > >> > >> > > > > > >> > >> > > > > >> > >> > > > >> > >> > > >> > >> > >> > >> >> > >> https://cwiki.apache.org/confluence/display/KAFKA/Writing+a+Driver+for+K= afka >> > >> > > > > > > , >> > >> > > > > > > see "Multi-Fetch". >> > >> > > > > > > >> > >> > > > > > > Thanks, >> > >> > > > > > > Jens >> > >> > > > > > > >> > >> > > > > > > On Wed, Dec 23, 2015 at 2:44 AM, Ismael Juma < >> > >> ismael@juma.me.uk> >> > >> > > > > wrote: >> > >> > > > > > > >> > >> > > > > > > > On Wed, Dec 23, 2015 at 1:24 AM, Gwen Shapira < >> > >> > gwen@confluent.io >> > >> > > > >> > >> > > > > > wrote: >> > >> > > > > > > > >> > >> > > > > > > > > Given the background, it sounds like you'll general= ly >> > want >> > >> > each >> > >> > > > > call >> > >> > > > > > to >> > >> > > > > > > > > poll() to return the same number of events (which i= s >> the >> > >> > number >> > >> > > > you >> > >> > > > > > > > planned >> > >> > > > > > > > > on having enough memory / time for). It also sounds >> like >> > >> > tuning >> > >> > > > the >> > >> > > > > > > > number >> > >> > > > > > > > > of events will be closely tied to tuning the sessio= n >> > >> timeout. >> > >> > > > That >> > >> > > > > > is - >> > >> > > > > > > > if >> > >> > > > > > > > > I choose to lower the session timeout for some >> reason, I >> > >> will >> > >> > > > have >> > >> > > > > to >> > >> > > > > > > > > modify the number of records returning too. >> > >> > > > > > > > > >> > >> > > > > > > > > If those assumptions are correct, I think a >> > configuration >> > >> > makes >> > >> > > > > more >> > >> > > > > > > > sense. >> > >> > > > > > > > > 1. We are unlikely to want this parameter to be >> change >> > at >> > >> the >> > >> > > > > > lifetime >> > >> > > > > > > of >> > >> > > > > > > > > the consumer >> > >> > > > > > > > > 2. The correct value is tied to another configurati= on >> > >> > > parameter, >> > >> > > > so >> > >> > > > > > > they >> > >> > > > > > > > > will be controlled together. >> > >> > > > > > > > > >> > >> > > > > > > > >> > >> > > > > > > > I was thinking the same thing. >> > >> > > > > > > > >> > >> > > > > > > > Ismael >> > >> > > > > > > > >> > >> > > > > > > >> > >> > > > > > > >> > >> > > > > > > >> > >> > > > > > > -- >> > >> > > > > > > Jens Rantil >> > >> > > > > > > Backend engineer >> > >> > > > > > > Tink AB >> > >> > > > > > > >> > >> > > > > > > Email: jens.rantil@tink.se >> > >> > > > > > > Phone: +46 708 84 18 32 >> > >> > > > > > > Web: www.tink.se >> > >> > > > > > > >> > >> > > > > > > Facebook Linkedin >> > >> > > > > > > < >> > >> > > > > > > >> > >> > > > > > >> > >> > > > > >> > >> > > > >> > >> > > >> > >> > >> > >> >> > >> http://www.linkedin.com/company/2735919?trk=3Dvsrp_companies_res_photo&t= rkInfo=3DVSRPsearchId%3A1057023381369207406670%2CVSRPtargetId%3A2735919%2CV= SRPcmpt%3Aprimary >> > >> > > > > > > > >> > >> > > > > > > Twitter >> > >> > > > > > > >> > >> > > > > > >> > >> > > > > >> > >> > > > >> > >> > > >> > >> > >> > >> >> > >> >> > >> >> > >> -- >> > >> Cliff Rhyne >> > >> Software Engineering Lead >> > >> e: crhyne@signal.co >> > >> signal.co >> > >> ________________________ >> > >> >> > >> Cut Through the Noise >> > >> >> > >> This e-mail and any files transmitted with it are for the sole use = of >> > the >> > >> intended recipient(s) and may contain confidential and privileged >> > >> information. Any unauthorized use of this email is strictly >> prohibited. >> > >> =C2=A92015 Signal. All rights reserved. >> > >> >> > > >> > > >> > >> >> >> >> -- >> Jens Rantil >> Backend engineer >> Tink AB >> >> Email: jens.rantil@tink.se >> Phone: +46 708 84 18 32 >> Web: www.tink.se >> >> Facebook Linkedin >> < >> http://www.linkedin.com/company/2735919?trk=3Dvsrp_companies_res_photo&t= rkInfo=3DVSRPsearchId%3A1057023381369207406670%2CVSRPtargetId%3A2735919%2CV= SRPcmpt%3Aprimary >> > >> Twitter >> > > --001a114106fab5525e0528d7e7b6--