Return-Path: X-Original-To: apmail-kafka-dev-archive@www.apache.org Delivered-To: apmail-kafka-dev-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id ED5B61835B for ; Thu, 7 Jan 2016 02:06:12 +0000 (UTC) Received: (qmail 96727 invoked by uid 500); 7 Jan 2016 02:06:12 -0000 Delivered-To: apmail-kafka-dev-archive@kafka.apache.org Received: (qmail 96632 invoked by uid 500); 7 Jan 2016 02:06:12 -0000 Mailing-List: contact dev-help@kafka.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@kafka.apache.org Delivered-To: mailing list dev@kafka.apache.org Received: (qmail 96619 invoked by uid 99); 7 Jan 2016 02:06:12 -0000 Received: from Unknown (HELO spamd4-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 07 Jan 2016 02:06:12 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd4-us-west.apache.org (ASF Mail Server at spamd4-us-west.apache.org) with ESMTP id 8CF75C0705 for ; Thu, 7 Jan 2016 02:06:11 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd4-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 3.001 X-Spam-Level: *** X-Spam-Status: No, score=3.001 tagged_above=-999 required=6.31 tests=[DKIM_SIGNED=0.1, DKIM_VALID=-0.1, HTML_MESSAGE=3, URIBL_BLOCKED=0.001] autolearn=disabled Authentication-Results: spamd4-us-west.apache.org (amavisd-new); dkim=pass (2048-bit key) header.d=confluent-io.20150623.gappssmtp.com Received: from mx1-us-east.apache.org ([10.40.0.8]) by localhost (spamd4-us-west.apache.org [10.40.0.11]) (amavisd-new, port 10024) with ESMTP id LFghrYdfjTQo for ; Thu, 7 Jan 2016 02:05:58 +0000 (UTC) Received: from mail-lf0-f49.google.com (mail-lf0-f49.google.com [209.85.215.49]) by mx1-us-east.apache.org (ASF Mail Server at mx1-us-east.apache.org) with ESMTPS id 91D5A429C4 for ; Thu, 7 Jan 2016 02:05:57 +0000 (UTC) Received: by mail-lf0-f49.google.com with SMTP id c192so147702728lfe.2 for ; Wed, 06 Jan 2016 18:05:57 -0800 (PST) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=confluent-io.20150623.gappssmtp.com; s=20150623; h=mime-version:in-reply-to:references:date:message-id:subject:from:to :content-type; bh=bCQNgAl+tn5DYG5U16Hxqox/qHwZT/AyP3n1k2jdyc8=; b=G7OvgevXVAsIU0NDbP1gIVTa5h1J1wdlrlfFdvp2BqYJmfbFSrZ7aWlQ47ue74+a5A 8AQRj5XqWxpHYk++/sQakTzLVuxlLr5gx6AovhdoYu3v5CCa7aQ4tHiIGEroddPd4yJx wbpjgdpdet6elpdloYUZI8G0wWTYJRKqMKmzfC91Z4JZbogSFUU++Y86dO1ozAkgWxZx T8XKzfLgWS1CNDVhJoCBxjuzmzZbwBmnuHQvXE7Q2UEjAAFwPBjoxJttgPGFkr+hzn4n hAB+Ndy3V457LZ1WBJSWTrDN5QWbuj0tK8OjYRKdBPJJWh/PrrsTgdSbhkoUJ1VwsIMY pkaw== X-Google-DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=1e100.net; s=20130820; h=x-gm-message-state:mime-version:in-reply-to:references:date :message-id:subject:from:to:content-type; bh=bCQNgAl+tn5DYG5U16Hxqox/qHwZT/AyP3n1k2jdyc8=; b=kUwz5DRYI+hhJSgaBKoGlGQC1JE+h9WsWa4+M7SagSCULiq8kLORttM/1AAWVA06u6 2gfGtUs+NO7PbRqqY1fDt6fCQFXyNnRrKlaYPy74DCxlY3yXZQE+XcajeeaphbHZUodq XO+f4sfFXj781o5JdsblStqNGvM5YSc5L7x33L5xj67LXyI+GZwnkzeylrj432jU1zai PSygny+yMkMsuio74eZS7t6LU+AvvJwtLye+7EvIAgYfaud8R5FPh+CA9j/a6/y+/m2U SvhNSkCVBZd2LmKY4Ft6yaWprm3D0QDPFnqmjyJ16+v5OkwXXfCT1oidpBp/e5tYcVm0 ekGg== X-Gm-Message-State: ALoCoQnp/I5VeJGjumrO64HOKwEy8L8///JGb3X412/mx9uD2QU84spiEMwslal3MAGM9tmXGYoG+p8+EtwanOtXAvX4/9+QxA== MIME-Version: 1.0 X-Received: by 10.25.21.225 with SMTP id 94mr32951672lfv.159.1452132349715; Wed, 06 Jan 2016 18:05:49 -0800 (PST) Received: by 10.112.16.227 with HTTP; Wed, 6 Jan 2016 18:05:49 -0800 (PST) In-Reply-To: References: Date: Wed, 6 Jan 2016 18:05:49 -0800 Message-ID: Subject: Re: KIP-41: KafkaConsumer Max Records From: Jason Gustafson To: dev@kafka.apache.org Content-Type: multipart/alternative; boundary=001a113f245edf43c90528b4e8f7 --001a113f245edf43c90528b4e8f7 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: quoted-printable Thanks for the suggestion, Ismael. I updated the KIP. -Jason On Wed, Jan 6, 2016 at 6:57 AM, Ismael Juma wrote: > Thanks Jason. I read the KIP and it makes sense to me. A minor suggestion= : > in the "Ensuring Fair Consumption" section, there are 3 paragraphs with 2 > examples (2 partitions with 100 max.poll.records and 3 partitions with 30 > max.poll.records). I think you could simplify this by using one of the > examples in the 3 paragraphs. > > Ismael > > On Tue, Jan 5, 2016 at 7:32 PM, Jason Gustafson > wrote: > > > I've updated the KIP with some implementation details. I also added mor= e > > discussion on the heartbeat() alternative. The short answer for why we > > rejected this API is that it doesn't seem to work well with offset > commits. > > This would tend to make correct usage complicated and difficult to > explain. > > Additionally, we don't see any clear advantages over having a way to se= t > > the max records. For example, using max.records=3D1 would be equivalent= to > > invoking heartbeat() on each iteration of the message processing loop. > > > > Going back to the discussion on whether we should use a configuration > value > > or overload poll(), I'm leaning toward the configuration option mainly > for > > compatibility and to keep the KafkaConsumer API from getting any more > > complex. Also, as others have mentioned, it seems reasonable to want to > > tune this setting in the same place that the session timeout and > heartbeat > > interval are configured. I still feel a little uncomfortable with the > need > > to do a lot of configuration tuning to get the consumer working for a > > particular environment, but hopefully the defaults are conservative > enough > > that most users won't need to. However, if it remains a problem, then w= e > > could still look into better options for managing the size of batches > > including overloading poll() with a max records argument or possibly by > > implementing a batch scaling algorithm internally. > > > > -Jason > > > > > > On Mon, Jan 4, 2016 at 12:18 PM, Jason Gustafson > > wrote: > > > > > Hi Cliff, > > > > > > I think we're all agreed that the current contract of poll() should b= e > > > kept. The consumer wouldn't wait for max messages to become available > in > > > this proposal; it would only sure that it never returns more than max > > > messages. > > > > > > -Jason > > > > > > On Mon, Jan 4, 2016 at 11:52 AM, Cliff Rhyne wrote= : > > > > > >> Instead of a heartbeat, I'd prefer poll() to return whatever message= s > > the > > >> client has. Either a) I don't care if I get less than my max messag= e > > >> limit > > >> or b) I do care and will set a larger timeout. Case B is less commo= n > > than > > >> A and is fairly easy to handle in the application's code. > > >> > > >> On Mon, Jan 4, 2016 at 1:47 PM, Gwen Shapira > wrote: > > >> > > >> > 1. Agree that TCP window style scaling will be cool. I'll try to > think > > >> of a > > >> > good excuse to use it ;) > > >> > > > >> > 2. I'm very concerned about the challenges of getting the timeouts= , > > >> > hearbeats and max messages right. > > >> > > > >> > Another option could be to expose "heartbeat" API to consumers. If > my > > >> app > > >> > is still processing data but is still alive, it could initiate a > > >> heartbeat > > >> > to signal its alive without having to handle additional messages. > > >> > > > >> > I don't know if this improves more than it complicates though :( > > >> > > > >> > On Mon, Jan 4, 2016 at 11:40 AM, Jason Gustafson < > jason@confluent.io> > > >> > wrote: > > >> > > > >> > > Hey Gwen, > > >> > > > > >> > > I was thinking along the lines of TCP window scaling in order to > > >> > > dynamically find a good consumption rate. Basically you'd start > off > > >> > > consuming say 100 records and you'd let it increase until the > > >> consumption > > >> > > took longer than half the session timeout (for example). You > /might/ > > >> be > > >> > > able to achieve the same thing using pause/resume, but it would > be a > > >> lot > > >> > > trickier since you have to do it at the granularity of partition= s. > > But > > >> > > yeah, database write performance doesn't always scale in a > > predictable > > >> > > enough way to accommodate this, so I'm not sure how useful it > would > > >> be in > > >> > > practice. It might also be more difficult to implement since it > > >> wouldn't > > >> > be > > >> > > as clear when to initiate the next fetch. With a static setting, > the > > >> > > consumer knows exactly how many records will be returned on the > next > > >> call > > >> > > to poll() and can send fetches accordingly. > > >> > > > > >> > > On the other hand, I do feel a little wary of the need to tune t= he > > >> > session > > >> > > timeout and max messages though since these settings might depen= d > on > > >> the > > >> > > environment that the consumer is deployed in. It wouldn't be a b= ig > > >> deal > > >> > if > > >> > > the impact was relatively minor, but getting them wrong can caus= e > a > > >> lot > > >> > of > > >> > > rebalance churn which could keep the consumer from making any > > >> progress. > > >> > > It's not a particularly graceful failure. > > >> > > > > >> > > -Jason > > >> > > > > >> > > On Mon, Jan 4, 2016 at 10:49 AM, Gwen Shapira > > >> wrote: > > >> > > > > >> > > > I can't speak to all use-cases, but for the database one, I > think > > >> > > > pause-resume will be necessary in any case, and therefore > dynamic > > >> batch > > >> > > > sizes are not needed. > > >> > > > > > >> > > > Databases are really unexpected regarding response times - loa= d > > and > > >> > > locking > > >> > > > can affect this. I'm not sure there's a good way to know you a= re > > >> going > > >> > > into > > >> > > > rebalance hell before it is too late. So if I were writing cod= e > > that > > >> > > > updates an RDBMS based on Kafka, I'd pick a reasonable batch > size > > >> (say > > >> > > 5000 > > >> > > > records), and basically pause, batch-insert all records, commi= t > > and > > >> > > resume. > > >> > > > > > >> > > > Does that make sense? > > >> > > > > > >> > > > On Mon, Jan 4, 2016 at 10:37 AM, Jason Gustafson < > > >> jason@confluent.io> > > >> > > > wrote: > > >> > > > > > >> > > > > Gwen and Ismael, > > >> > > > > > > >> > > > > I agree the configuration option is probably the way to go, > but > > I > > >> was > > >> > > > > wondering whether there would be cases where it made sense t= o > > let > > >> the > > >> > > > > consumer dynamically set max messages to adjust for downstre= am > > >> > > slowness. > > >> > > > > For example, if the consumer is writing consumed records to > > >> another > > >> > > > > database, and that database is experiencing heavier than > > expected > > >> > load, > > >> > > > > then the consumer could halve its current max messages in > order > > to > > >> > > adapt > > >> > > > > without risking rebalance hell. It could then increase max > > >> messages > > >> > as > > >> > > > the > > >> > > > > load on the database decreases. It's basically an easier way > to > > >> > handle > > >> > > > flow > > >> > > > > control than we provide with pause/resume. > > >> > > > > > > >> > > > > -Jason > > >> > > > > > > >> > > > > On Mon, Jan 4, 2016 at 9:46 AM, Gwen Shapira < > gwen@confluent.io > > > > > >> > > wrote: > > >> > > > > > > >> > > > > > The wiki you pointed to is no longer maintained and fell o= ut > > of > > >> > sync > > >> > > > with > > >> > > > > > the code and protocol. > > >> > > > > > > > >> > > > > > You may want to refer to: > > >> > > > > > > > >> > > > > > > > >> > > > > > > >> > > > > > >> > > > > >> > > > >> > > > https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Pr= otocol > > >> > > > > > > > >> > > > > > On Mon, Jan 4, 2016 at 4:38 AM, Jens Rantil < > > >> jens.rantil@tink.se> > > >> > > > wrote: > > >> > > > > > > > >> > > > > > > Hi guys, > > >> > > > > > > > > >> > > > > > > I realized I never thanked yall for your input - thanks! > > >> > > > > > > Jason: I apologize for assuming your stance on the issue= ! > > >> Feels > > >> > > like > > >> > > > we > > >> > > > > > all > > >> > > > > > > agreed on the solution. +1 > > >> > > > > > > > > >> > > > > > > Follow-up: Jason made a point about defining prefetch an= d > > >> > fairness > > >> > > > > > > behaviour in the KIP. I am now working on putting that > down > > in > > >> > > > writing. > > >> > > > > > To > > >> > > > > > > do be able to do this I think I need to understand the > > current > > >> > > > prefetch > > >> > > > > > > behaviour in the new consumer API (0.9) a bit better. So= me > > >> > specific > > >> > > > > > > questions: > > >> > > > > > > > > >> > > > > > > - How does a specific consumer balance incoming > messages > > >> from > > >> > > > > multiple > > >> > > > > > > partitions? Is the consumer simply issuing Multi-Fetc= h > > >> > > requests[1] > > >> > > > > for > > >> > > > > > > the > > >> > > > > > > consumed assigned partitions of the relevant topics? = Or > > is > > >> the > > >> > > > > > consumer > > >> > > > > > > fetching from one partition at a time and balancing > > between > > >> > them > > >> > > > > > > internally? That is, is the responsibility of partiti= on > > >> > > balancing > > >> > > > > (and > > >> > > > > > > fairness) on the broker side or consumer side? > > >> > > > > > > - Is the above documented somewhere? > > >> > > > > > > > > >> > > > > > > [1] > > >> > > > > > > > > >> > > > > > > > > >> > > > > > > > >> > > > > > > >> > > > > > >> > > > > >> > > > >> > > > https://cwiki.apache.org/confluence/display/KAFKA/Writing+a+Driver+for+Ka= fka > > >> > > > > > > , > > >> > > > > > > see "Multi-Fetch". > > >> > > > > > > > > >> > > > > > > Thanks, > > >> > > > > > > Jens > > >> > > > > > > > > >> > > > > > > On Wed, Dec 23, 2015 at 2:44 AM, Ismael Juma < > > >> ismael@juma.me.uk> > > >> > > > > wrote: > > >> > > > > > > > > >> > > > > > > > On Wed, Dec 23, 2015 at 1:24 AM, Gwen Shapira < > > >> > gwen@confluent.io > > >> > > > > > >> > > > > > wrote: > > >> > > > > > > > > > >> > > > > > > > > Given the background, it sounds like you'll generall= y > > want > > >> > each > > >> > > > > call > > >> > > > > > to > > >> > > > > > > > > poll() to return the same number of events (which is > the > > >> > number > > >> > > > you > > >> > > > > > > > planned > > >> > > > > > > > > on having enough memory / time for). It also sounds > like > > >> > tuning > > >> > > > the > > >> > > > > > > > number > > >> > > > > > > > > of events will be closely tied to tuning the session > > >> timeout. > > >> > > > That > > >> > > > > > is - > > >> > > > > > > > if > > >> > > > > > > > > I choose to lower the session timeout for some > reason, I > > >> will > > >> > > > have > > >> > > > > to > > >> > > > > > > > > modify the number of records returning too. > > >> > > > > > > > > > > >> > > > > > > > > If those assumptions are correct, I think a > > configuration > > >> > makes > > >> > > > > more > > >> > > > > > > > sense. > > >> > > > > > > > > 1. We are unlikely to want this parameter to be chan= ge > > at > > >> the > > >> > > > > > lifetime > > >> > > > > > > of > > >> > > > > > > > > the consumer > > >> > > > > > > > > 2. The correct value is tied to another configuratio= n > > >> > > parameter, > > >> > > > so > > >> > > > > > > they > > >> > > > > > > > > will be controlled together. > > >> > > > > > > > > > > >> > > > > > > > > > >> > > > > > > > I was thinking the same thing. > > >> > > > > > > > > > >> > > > > > > > Ismael > > >> > > > > > > > > > >> > > > > > > > > >> > > > > > > > > >> > > > > > > > > >> > > > > > > -- > > >> > > > > > > Jens Rantil > > >> > > > > > > Backend engineer > > >> > > > > > > Tink AB > > >> > > > > > > > > >> > > > > > > Email: jens.rantil@tink.se > > >> > > > > > > Phone: +46 708 84 18 32 > > >> > > > > > > Web: www.tink.se > > >> > > > > > > > > >> > > > > > > Facebook Linkedin > > >> > > > > > > < > > >> > > > > > > > > >> > > > > > > > >> > > > > > > >> > > > > > >> > > > > >> > > > >> > > > http://www.linkedin.com/company/2735919?trk=3Dvsrp_companies_res_photo&tr= kInfo=3DVSRPsearchId%3A1057023381369207406670%2CVSRPtargetId%3A2735919%2CVS= RPcmpt%3Aprimary > > >> > > > > > > > > > >> > > > > > > Twitter > > >> > > > > > > > > >> > > > > > > > >> > > > > > > >> > > > > > >> > > > > >> > > > >> > > >> > > >> > > >> -- > > >> Cliff Rhyne > > >> Software Engineering Lead > > >> e: crhyne@signal.co > > >> signal.co > > >> ________________________ > > >> > > >> Cut Through the Noise > > >> > > >> This e-mail and any files transmitted with it are for the sole use o= f > > the > > >> intended recipient(s) and may contain confidential and privileged > > >> information. Any unauthorized use of this email is strictly > prohibited. > > >> =C2=A92015 Signal. All rights reserved. > > >> > > > > > > > > > --001a113f245edf43c90528b4e8f7--