Return-Path: X-Original-To: apmail-kafka-dev-archive@www.apache.org Delivered-To: apmail-kafka-dev-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id A2523184B9 for ; Mon, 4 Jan 2016 19:40:30 +0000 (UTC) Received: (qmail 21804 invoked by uid 500); 4 Jan 2016 19:40:30 -0000 Delivered-To: apmail-kafka-dev-archive@kafka.apache.org Received: (qmail 21698 invoked by uid 500); 4 Jan 2016 19:40:30 -0000 Mailing-List: contact dev-help@kafka.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@kafka.apache.org Delivered-To: mailing list dev@kafka.apache.org Received: (qmail 21642 invoked by uid 99); 4 Jan 2016 19:40:29 -0000 Received: from Unknown (HELO spamd3-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 04 Jan 2016 19:40:29 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd3-us-west.apache.org (ASF Mail Server at spamd3-us-west.apache.org) with ESMTP id 1BA0C1804C6 for ; Mon, 4 Jan 2016 19:40:29 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd3-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 3.001 X-Spam-Level: *** X-Spam-Status: No, score=3.001 tagged_above=-999 required=6.31 tests=[DKIM_SIGNED=0.1, DKIM_VALID=-0.1, HTML_MESSAGE=3, URIBL_BLOCKED=0.001] autolearn=disabled Authentication-Results: spamd3-us-west.apache.org (amavisd-new); dkim=pass (2048-bit key) header.d=confluent-io.20150623.gappssmtp.com Received: from mx1-us-east.apache.org ([10.40.0.8]) by localhost (spamd3-us-west.apache.org [10.40.0.10]) (amavisd-new, port 10024) with ESMTP id KBHSG-PU0GKx for ; Mon, 4 Jan 2016 19:40:17 +0000 (UTC) Received: from mail-lf0-f53.google.com (mail-lf0-f53.google.com [209.85.215.53]) by mx1-us-east.apache.org (ASF Mail Server at mx1-us-east.apache.org) with ESMTPS id 644C0429C2 for ; Mon, 4 Jan 2016 19:40:16 +0000 (UTC) Received: by mail-lf0-f53.google.com with SMTP id z124so276600863lfa.3 for ; Mon, 04 Jan 2016 11:40:16 -0800 (PST) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=confluent-io.20150623.gappssmtp.com; s=20150623; h=mime-version:in-reply-to:references:date:message-id:subject:from:to :content-type; bh=mVGNnpuq/NcE2P1OGbbGfWUFjEdbQO2ycjsBglnq1mc=; b=c1EyknakT4WsMYWN2SElKUNKSM61pmFX3I/BMCS8EJeFH0iXLeOdkR90SkEjR2KaC1 OFscfS1p5JmW5UyqrEQWaMny+UdWlLXZpmW20pA8hXfrt9PEr8QdKhpzjfeCnmamy0vv xCFKvutJIPKiLZ7ls1JbyagtzmcERy5YH5YH29V3mWJNBNOJuMFBStWI/XQueNs/M85E 6v7AyHUgNOpox1hpCNMIvFAmpWyEfTTN/iIr9D6YgiTLovZwodGTBVpccl782iY/ni4e y1pEJ3zWofk2Qog4xXfRyiLWGDzilu2vgQGzbLH1PVOC5oD5ib2JezWS2xyRSezzA35W xGaA== X-Google-DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=1e100.net; s=20130820; h=x-gm-message-state:mime-version:in-reply-to:references:date :message-id:subject:from:to:content-type; bh=mVGNnpuq/NcE2P1OGbbGfWUFjEdbQO2ycjsBglnq1mc=; b=D9HlE7SCTQYGPQytTJ9+jIsj+zG0K+t66X59tntGY/4J9GoY7q5iDVtTltbJM0aHa1 R3Fha3XOh63F3MHqkTrnULaUBvsf9p178Xar/SdUvIt015xgnc46tcuOjfwCabMhyb1D 0lr2tDGrHPQQSQMddl6Un4mKJ01i8EdyHM4XmIzMMIwSvsdfoziSxThvzrpEjOAKCI3d IqLD03+oCSm3iPuH8P0hXeadqNrH+9toQ3/qJclA0kv4yIUvwo8El9i39ouihWkq+4P4 2QFsruO5jia3/TIgPuA0MZH75nhYi0gezdfglZNGGb01lbW2Z49cXib4YKdEiqoHJtYL hlEQ== X-Gm-Message-State: ALoCoQlerka4/Ll7XKFNtq252j21EtanczwQcMxxOzXSFssGs/6gtwgVP0wS5k5RX5403QzAhYCpLibOdeQ7CnQPTT0G7CYiwA== MIME-Version: 1.0 X-Received: by 10.25.161.6 with SMTP id k6mr21222986lfe.17.1451936409047; Mon, 04 Jan 2016 11:40:09 -0800 (PST) Received: by 10.112.16.227 with HTTP; Mon, 4 Jan 2016 11:40:08 -0800 (PST) In-Reply-To: References: Date: Mon, 4 Jan 2016 11:40:08 -0800 Message-ID: Subject: Re: KIP-41: KafkaConsumer Max Records From: Jason Gustafson To: dev@kafka.apache.org Content-Type: multipart/alternative; boundary=001a114106fae5f50505288749bb --001a114106fae5f50505288749bb Content-Type: text/plain; charset=UTF-8 Hey Gwen, I was thinking along the lines of TCP window scaling in order to dynamically find a good consumption rate. Basically you'd start off consuming say 100 records and you'd let it increase until the consumption took longer than half the session timeout (for example). You /might/ be able to achieve the same thing using pause/resume, but it would be a lot trickier since you have to do it at the granularity of partitions. But yeah, database write performance doesn't always scale in a predictable enough way to accommodate this, so I'm not sure how useful it would be in practice. It might also be more difficult to implement since it wouldn't be as clear when to initiate the next fetch. With a static setting, the consumer knows exactly how many records will be returned on the next call to poll() and can send fetches accordingly. On the other hand, I do feel a little wary of the need to tune the session timeout and max messages though since these settings might depend on the environment that the consumer is deployed in. It wouldn't be a big deal if the impact was relatively minor, but getting them wrong can cause a lot of rebalance churn which could keep the consumer from making any progress. It's not a particularly graceful failure. -Jason On Mon, Jan 4, 2016 at 10:49 AM, Gwen Shapira wrote: > I can't speak to all use-cases, but for the database one, I think > pause-resume will be necessary in any case, and therefore dynamic batch > sizes are not needed. > > Databases are really unexpected regarding response times - load and locking > can affect this. I'm not sure there's a good way to know you are going into > rebalance hell before it is too late. So if I were writing code that > updates an RDBMS based on Kafka, I'd pick a reasonable batch size (say 5000 > records), and basically pause, batch-insert all records, commit and resume. > > Does that make sense? > > On Mon, Jan 4, 2016 at 10:37 AM, Jason Gustafson > wrote: > > > Gwen and Ismael, > > > > I agree the configuration option is probably the way to go, but I was > > wondering whether there would be cases where it made sense to let the > > consumer dynamically set max messages to adjust for downstream slowness. > > For example, if the consumer is writing consumed records to another > > database, and that database is experiencing heavier than expected load, > > then the consumer could halve its current max messages in order to adapt > > without risking rebalance hell. It could then increase max messages as > the > > load on the database decreases. It's basically an easier way to handle > flow > > control than we provide with pause/resume. > > > > -Jason > > > > On Mon, Jan 4, 2016 at 9:46 AM, Gwen Shapira wrote: > > > > > The wiki you pointed to is no longer maintained and fell out of sync > with > > > the code and protocol. > > > > > > You may want to refer to: > > > > > > > > > https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol > > > > > > On Mon, Jan 4, 2016 at 4:38 AM, Jens Rantil > wrote: > > > > > > > Hi guys, > > > > > > > > I realized I never thanked yall for your input - thanks! > > > > Jason: I apologize for assuming your stance on the issue! Feels like > we > > > all > > > > agreed on the solution. +1 > > > > > > > > Follow-up: Jason made a point about defining prefetch and fairness > > > > behaviour in the KIP. I am now working on putting that down in > writing. > > > To > > > > do be able to do this I think I need to understand the current > prefetch > > > > behaviour in the new consumer API (0.9) a bit better. Some specific > > > > questions: > > > > > > > > - How does a specific consumer balance incoming messages from > > multiple > > > > partitions? Is the consumer simply issuing Multi-Fetch requests[1] > > for > > > > the > > > > consumed assigned partitions of the relevant topics? Or is the > > > consumer > > > > fetching from one partition at a time and balancing between them > > > > internally? That is, is the responsibility of partition balancing > > (and > > > > fairness) on the broker side or consumer side? > > > > - Is the above documented somewhere? > > > > > > > > [1] > > > > > > > > > > > > > > https://cwiki.apache.org/confluence/display/KAFKA/Writing+a+Driver+for+Kafka > > > > , > > > > see "Multi-Fetch". > > > > > > > > Thanks, > > > > Jens > > > > > > > > On Wed, Dec 23, 2015 at 2:44 AM, Ismael Juma > > wrote: > > > > > > > > > On Wed, Dec 23, 2015 at 1:24 AM, Gwen Shapira > > > wrote: > > > > > > > > > > > Given the background, it sounds like you'll generally want each > > call > > > to > > > > > > poll() to return the same number of events (which is the number > you > > > > > planned > > > > > > on having enough memory / time for). It also sounds like tuning > the > > > > > number > > > > > > of events will be closely tied to tuning the session timeout. > That > > > is - > > > > > if > > > > > > I choose to lower the session timeout for some reason, I will > have > > to > > > > > > modify the number of records returning too. > > > > > > > > > > > > If those assumptions are correct, I think a configuration makes > > more > > > > > sense. > > > > > > 1. We are unlikely to want this parameter to be change at the > > > lifetime > > > > of > > > > > > the consumer > > > > > > 2. The correct value is tied to another configuration parameter, > so > > > > they > > > > > > will be controlled together. > > > > > > > > > > > > > > > > I was thinking the same thing. > > > > > > > > > > Ismael > > > > > > > > > > > > > > > > > > > > > -- > > > > Jens Rantil > > > > Backend engineer > > > > Tink AB > > > > > > > > Email: jens.rantil@tink.se > > > > Phone: +46 708 84 18 32 > > > > Web: www.tink.se > > > > > > > > Facebook Linkedin > > > > < > > > > > > > > > > http://www.linkedin.com/company/2735919?trk=vsrp_companies_res_photo&trkInfo=VSRPsearchId%3A1057023381369207406670%2CVSRPtargetId%3A2735919%2CVSRPcmpt%3Aprimary > > > > > > > > > Twitter > > > > > > > > > > --001a114106fae5f50505288749bb--