Return-Path: X-Original-To: apmail-kafka-dev-archive@www.apache.org Delivered-To: apmail-kafka-dev-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 909DD18432 for ; Tue, 5 Jan 2016 19:32:34 +0000 (UTC) Received: (qmail 83378 invoked by uid 500); 5 Jan 2016 19:32:29 -0000 Delivered-To: apmail-kafka-dev-archive@kafka.apache.org Received: (qmail 83297 invoked by uid 500); 5 Jan 2016 19:32:29 -0000 Mailing-List: contact dev-help@kafka.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@kafka.apache.org Delivered-To: mailing list dev@kafka.apache.org Received: (qmail 83285 invoked by uid 99); 5 Jan 2016 19:32:28 -0000 Received: from Unknown (HELO spamd2-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 05 Jan 2016 19:32:28 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd2-us-west.apache.org (ASF Mail Server at spamd2-us-west.apache.org) with ESMTP id 628631A0854 for ; Tue, 5 Jan 2016 19:32:28 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd2-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 3.001 X-Spam-Level: *** X-Spam-Status: No, score=3.001 tagged_above=-999 required=6.31 tests=[DKIM_SIGNED=0.1, DKIM_VALID=-0.1, HTML_MESSAGE=3, URIBL_BLOCKED=0.001] autolearn=disabled Authentication-Results: spamd2-us-west.apache.org (amavisd-new); dkim=pass (2048-bit key) header.d=confluent-io.20150623.gappssmtp.com Received: from mx1-us-west.apache.org ([10.40.0.8]) by localhost (spamd2-us-west.apache.org [10.40.0.9]) (amavisd-new, port 10024) with ESMTP id tv337fQFa1VG for ; Tue, 5 Jan 2016 19:32:15 +0000 (UTC) Received: from mail-lb0-f174.google.com (mail-lb0-f174.google.com [209.85.217.174]) by mx1-us-west.apache.org (ASF Mail Server at mx1-us-west.apache.org) with ESMTPS id 6946520CC8 for ; Tue, 5 Jan 2016 19:32:14 +0000 (UTC) Received: by mail-lb0-f174.google.com with SMTP id bc4so179660877lbc.2 for ; Tue, 05 Jan 2016 11:32:14 -0800 (PST) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=confluent-io.20150623.gappssmtp.com; s=20150623; h=mime-version:in-reply-to:references:date:message-id:subject:from:to :content-type; bh=ieYL9uJis5ThCbuVLKgGi5EJpxbvcMDO0tHlwFLe4Ds=; b=lJ14t0YyDzpGYw6hQRgLDxTL+rmpmj1a/HrSxpW6rTBzdPrSyTgRjIhgW1TkOLwHwG wQyQLr4Qh/y1PxC3kVi9oAQ7+5rapFVWm+TZxUZ7L+wD40MKN32gtdoAkiwL5+xSThNW JxPnFfhUqlHsDXskFwdtsg3jIYoUSZaefrUZEbNldAwDCElI+sKlFEnjgJwt2f8aQGr4 QmaVP+G7SqedGobQtc+r4hPAVaDMzgSjTEFSh2sxvEH6Vurx7JNy8MNoqvMLM1wkujd3 oq2TN32LWICb3xZHfs211raF1I+FKihuwTxolJmF/Mx1f4BBNgNRAQoatFpbV79uuKCJ pCsA== X-Google-DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=1e100.net; s=20130820; h=x-gm-message-state:mime-version:in-reply-to:references:date :message-id:subject:from:to:content-type; bh=ieYL9uJis5ThCbuVLKgGi5EJpxbvcMDO0tHlwFLe4Ds=; b=PlGQ4+kjzt6ycX30P0vGL4YNS+F/m8bwhulMi5ZbDqNbyOoCizyD6v5ZF+FG8BMF/t 8i24j7gMorERb6iSRUIhJr1zT8UvKEx2yjBxclsxabJjoLOB4zqHOwsgvj9y5hoHY6Yd x+R08SFsgdz2aTwU6WtmRmg845G9LchDC3YDgIo4/5Olzd2L6Qu2Cu4olHatGNwQd2nW DKaHc/MdlVRJzd0lYXtkSk1tkcKMUf64AC8FLp1rkwIC2YSmq9Rd/yNsTTsNVSJR4rQf uJUuwbfNKcY6QhhOiptb+frs4n/m8wCUXGg+PIAl24jcPC5EW1hVI+DutNao0A/P8a02 w5Tg== X-Gm-Message-State: ALoCoQkxOIAC9M9arDDgFYOTjlOOmskbFaBXAn0prIcFaRhUcFND/VEftSiO53oxMvlGAVsi7X9n5h2dZcGkRiMlnqCYSuocVw== MIME-Version: 1.0 X-Received: by 10.112.55.97 with SMTP id r1mr33380193lbp.117.1452022332390; Tue, 05 Jan 2016 11:32:12 -0800 (PST) Received: by 10.112.16.227 with HTTP; Tue, 5 Jan 2016 11:32:12 -0800 (PST) In-Reply-To: References: Date: Tue, 5 Jan 2016 11:32:12 -0800 Message-ID: Subject: Re: KIP-41: KafkaConsumer Max Records From: Jason Gustafson To: dev@kafka.apache.org Content-Type: multipart/alternative; boundary=001a11c3f16a541f7a05289b4bbe --001a11c3f16a541f7a05289b4bbe Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: quoted-printable I've updated the KIP with some implementation details. I also added more discussion on the heartbeat() alternative. The short answer for why we rejected this API is that it doesn't seem to work well with offset commits. This would tend to make correct usage complicated and difficult to explain. Additionally, we don't see any clear advantages over having a way to set the max records. For example, using max.records=3D1 would be equivalent to invoking heartbeat() on each iteration of the message processing loop. Going back to the discussion on whether we should use a configuration value or overload poll(), I'm leaning toward the configuration option mainly for compatibility and to keep the KafkaConsumer API from getting any more complex. Also, as others have mentioned, it seems reasonable to want to tune this setting in the same place that the session timeout and heartbeat interval are configured. I still feel a little uncomfortable with the need to do a lot of configuration tuning to get the consumer working for a particular environment, but hopefully the defaults are conservative enough that most users won't need to. However, if it remains a problem, then we could still look into better options for managing the size of batches including overloading poll() with a max records argument or possibly by implementing a batch scaling algorithm internally. -Jason On Mon, Jan 4, 2016 at 12:18 PM, Jason Gustafson wrote= : > Hi Cliff, > > I think we're all agreed that the current contract of poll() should be > kept. The consumer wouldn't wait for max messages to become available in > this proposal; it would only sure that it never returns more than max > messages. > > -Jason > > On Mon, Jan 4, 2016 at 11:52 AM, Cliff Rhyne wrote: > >> Instead of a heartbeat, I'd prefer poll() to return whatever messages th= e >> client has. Either a) I don't care if I get less than my max message >> limit >> or b) I do care and will set a larger timeout. Case B is less common th= an >> A and is fairly easy to handle in the application's code. >> >> On Mon, Jan 4, 2016 at 1:47 PM, Gwen Shapira wrote: >> >> > 1. Agree that TCP window style scaling will be cool. I'll try to think >> of a >> > good excuse to use it ;) >> > >> > 2. I'm very concerned about the challenges of getting the timeouts, >> > hearbeats and max messages right. >> > >> > Another option could be to expose "heartbeat" API to consumers. If my >> app >> > is still processing data but is still alive, it could initiate a >> heartbeat >> > to signal its alive without having to handle additional messages. >> > >> > I don't know if this improves more than it complicates though :( >> > >> > On Mon, Jan 4, 2016 at 11:40 AM, Jason Gustafson >> > wrote: >> > >> > > Hey Gwen, >> > > >> > > I was thinking along the lines of TCP window scaling in order to >> > > dynamically find a good consumption rate. Basically you'd start off >> > > consuming say 100 records and you'd let it increase until the >> consumption >> > > took longer than half the session timeout (for example). You /might/ >> be >> > > able to achieve the same thing using pause/resume, but it would be a >> lot >> > > trickier since you have to do it at the granularity of partitions. B= ut >> > > yeah, database write performance doesn't always scale in a predictab= le >> > > enough way to accommodate this, so I'm not sure how useful it would >> be in >> > > practice. It might also be more difficult to implement since it >> wouldn't >> > be >> > > as clear when to initiate the next fetch. With a static setting, the >> > > consumer knows exactly how many records will be returned on the next >> call >> > > to poll() and can send fetches accordingly. >> > > >> > > On the other hand, I do feel a little wary of the need to tune the >> > session >> > > timeout and max messages though since these settings might depend on >> the >> > > environment that the consumer is deployed in. It wouldn't be a big >> deal >> > if >> > > the impact was relatively minor, but getting them wrong can cause a >> lot >> > of >> > > rebalance churn which could keep the consumer from making any >> progress. >> > > It's not a particularly graceful failure. >> > > >> > > -Jason >> > > >> > > On Mon, Jan 4, 2016 at 10:49 AM, Gwen Shapira >> wrote: >> > > >> > > > I can't speak to all use-cases, but for the database one, I think >> > > > pause-resume will be necessary in any case, and therefore dynamic >> batch >> > > > sizes are not needed. >> > > > >> > > > Databases are really unexpected regarding response times - load an= d >> > > locking >> > > > can affect this. I'm not sure there's a good way to know you are >> going >> > > into >> > > > rebalance hell before it is too late. So if I were writing code th= at >> > > > updates an RDBMS based on Kafka, I'd pick a reasonable batch size >> (say >> > > 5000 >> > > > records), and basically pause, batch-insert all records, commit an= d >> > > resume. >> > > > >> > > > Does that make sense? >> > > > >> > > > On Mon, Jan 4, 2016 at 10:37 AM, Jason Gustafson < >> jason@confluent.io> >> > > > wrote: >> > > > >> > > > > Gwen and Ismael, >> > > > > >> > > > > I agree the configuration option is probably the way to go, but = I >> was >> > > > > wondering whether there would be cases where it made sense to le= t >> the >> > > > > consumer dynamically set max messages to adjust for downstream >> > > slowness. >> > > > > For example, if the consumer is writing consumed records to >> another >> > > > > database, and that database is experiencing heavier than expecte= d >> > load, >> > > > > then the consumer could halve its current max messages in order = to >> > > adapt >> > > > > without risking rebalance hell. It could then increase max >> messages >> > as >> > > > the >> > > > > load on the database decreases. It's basically an easier way to >> > handle >> > > > flow >> > > > > control than we provide with pause/resume. >> > > > > >> > > > > -Jason >> > > > > >> > > > > On Mon, Jan 4, 2016 at 9:46 AM, Gwen Shapira >> > > wrote: >> > > > > >> > > > > > The wiki you pointed to is no longer maintained and fell out o= f >> > sync >> > > > with >> > > > > > the code and protocol. >> > > > > > >> > > > > > You may want to refer to: >> > > > > > >> > > > > > >> > > > > >> > > > >> > > >> > >> https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+P= rotocol >> > > > > > >> > > > > > On Mon, Jan 4, 2016 at 4:38 AM, Jens Rantil < >> jens.rantil@tink.se> >> > > > wrote: >> > > > > > >> > > > > > > Hi guys, >> > > > > > > >> > > > > > > I realized I never thanked yall for your input - thanks! >> > > > > > > Jason: I apologize for assuming your stance on the issue! >> Feels >> > > like >> > > > we >> > > > > > all >> > > > > > > agreed on the solution. +1 >> > > > > > > >> > > > > > > Follow-up: Jason made a point about defining prefetch and >> > fairness >> > > > > > > behaviour in the KIP. I am now working on putting that down = in >> > > > writing. >> > > > > > To >> > > > > > > do be able to do this I think I need to understand the curre= nt >> > > > prefetch >> > > > > > > behaviour in the new consumer API (0.9) a bit better. Some >> > specific >> > > > > > > questions: >> > > > > > > >> > > > > > > - How does a specific consumer balance incoming messages >> from >> > > > > multiple >> > > > > > > partitions? Is the consumer simply issuing Multi-Fetch >> > > requests[1] >> > > > > for >> > > > > > > the >> > > > > > > consumed assigned partitions of the relevant topics? Or i= s >> the >> > > > > > consumer >> > > > > > > fetching from one partition at a time and balancing betwe= en >> > them >> > > > > > > internally? That is, is the responsibility of partition >> > > balancing >> > > > > (and >> > > > > > > fairness) on the broker side or consumer side? >> > > > > > > - Is the above documented somewhere? >> > > > > > > >> > > > > > > [1] >> > > > > > > >> > > > > > > >> > > > > > >> > > > > >> > > > >> > > >> > >> https://cwiki.apache.org/confluence/display/KAFKA/Writing+a+Driver+for+K= afka >> > > > > > > , >> > > > > > > see "Multi-Fetch". >> > > > > > > >> > > > > > > Thanks, >> > > > > > > Jens >> > > > > > > >> > > > > > > On Wed, Dec 23, 2015 at 2:44 AM, Ismael Juma < >> ismael@juma.me.uk> >> > > > > wrote: >> > > > > > > >> > > > > > > > On Wed, Dec 23, 2015 at 1:24 AM, Gwen Shapira < >> > gwen@confluent.io >> > > > >> > > > > > wrote: >> > > > > > > > >> > > > > > > > > Given the background, it sounds like you'll generally wa= nt >> > each >> > > > > call >> > > > > > to >> > > > > > > > > poll() to return the same number of events (which is the >> > number >> > > > you >> > > > > > > > planned >> > > > > > > > > on having enough memory / time for). It also sounds like >> > tuning >> > > > the >> > > > > > > > number >> > > > > > > > > of events will be closely tied to tuning the session >> timeout. >> > > > That >> > > > > > is - >> > > > > > > > if >> > > > > > > > > I choose to lower the session timeout for some reason, I >> will >> > > > have >> > > > > to >> > > > > > > > > modify the number of records returning too. >> > > > > > > > > >> > > > > > > > > If those assumptions are correct, I think a configuratio= n >> > makes >> > > > > more >> > > > > > > > sense. >> > > > > > > > > 1. We are unlikely to want this parameter to be change a= t >> the >> > > > > > lifetime >> > > > > > > of >> > > > > > > > > the consumer >> > > > > > > > > 2. The correct value is tied to another configuration >> > > parameter, >> > > > so >> > > > > > > they >> > > > > > > > > will be controlled together. >> > > > > > > > > >> > > > > > > > >> > > > > > > > I was thinking the same thing. >> > > > > > > > >> > > > > > > > Ismael >> > > > > > > > >> > > > > > > >> > > > > > > >> > > > > > > >> > > > > > > -- >> > > > > > > Jens Rantil >> > > > > > > Backend engineer >> > > > > > > Tink AB >> > > > > > > >> > > > > > > Email: jens.rantil@tink.se >> > > > > > > Phone: +46 708 84 18 32 >> > > > > > > Web: www.tink.se >> > > > > > > >> > > > > > > Facebook Linkedin >> > > > > > > < >> > > > > > > >> > > > > > >> > > > > >> > > > >> > > >> > >> http://www.linkedin.com/company/2735919?trk=3Dvsrp_companies_res_photo&t= rkInfo=3DVSRPsearchId%3A1057023381369207406670%2CVSRPtargetId%3A2735919%2CV= SRPcmpt%3Aprimary >> > > > > > > > >> > > > > > > Twitter >> > > > > > > >> > > > > > >> > > > > >> > > > >> > > >> > >> >> >> >> -- >> Cliff Rhyne >> Software Engineering Lead >> e: crhyne@signal.co >> signal.co >> ________________________ >> >> Cut Through the Noise >> >> This e-mail and any files transmitted with it are for the sole use of th= e >> intended recipient(s) and may contain confidential and privileged >> information. Any unauthorized use of this email is strictly prohibited. >> =C2=A92015 Signal. All rights reserved. >> > > --001a11c3f16a541f7a05289b4bbe--