Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 55D01200B16 for ; Mon, 6 Jun 2016 08:24:35 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 547E2160A53; Mon, 6 Jun 2016 06:24:35 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id CC70A160A28 for ; Mon, 6 Jun 2016 08:24:33 +0200 (CEST) Received: (qmail 41995 invoked by uid 500); 6 Jun 2016 06:24:32 -0000 Mailing-List: contact dev-help@kafka.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@kafka.apache.org Delivered-To: mailing list dev@kafka.apache.org Received: (qmail 41977 invoked by uid 99); 6 Jun 2016 06:24:32 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd1-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 06 Jun 2016 06:24:32 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd1-us-west.apache.org (ASF Mail Server at spamd1-us-west.apache.org) with ESMTP id C4F12C1FF0 for ; Mon, 6 Jun 2016 06:24:31 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd1-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 1.179 X-Spam-Level: * X-Spam-Status: No, score=1.179 tagged_above=-999 required=6.31 tests=[DKIM_SIGNED=0.1, DKIM_VALID=-0.1, DKIM_VALID_AU=-0.1, HTML_MESSAGE=2, RCVD_IN_DNSWL_LOW=-0.7, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, SPF_PASS=-0.001] autolearn=disabled Authentication-Results: spamd1-us-west.apache.org (amavisd-new); dkim=pass (1024-bit key) header.d=pinterest.com Received: from mx2-lw-us.apache.org ([10.40.0.8]) by localhost (spamd1-us-west.apache.org [10.40.0.7]) (amavisd-new, port 10024) with ESMTP id By2W_1rRyxHV for ; Mon, 6 Jun 2016 06:24:26 +0000 (UTC) Received: from mail-oi0-f49.google.com (mail-oi0-f49.google.com [209.85.218.49]) by mx2-lw-us.apache.org (ASF Mail Server at mx2-lw-us.apache.org) with ESMTPS id 399AC5F487 for ; Mon, 6 Jun 2016 06:24:26 +0000 (UTC) Received: by mail-oi0-f49.google.com with SMTP id p204so15882596oih.3 for ; Sun, 05 Jun 2016 23:24:26 -0700 (PDT) X-Google-DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=1e100.net; s=20130820; h=x-gm-message-state:mime-version:in-reply-to:references:date :message-id:subject:from:to; bh=vsOPZmnka+rNoV2jOlyFI2mKmaG+7iChPhzryscJ/XQ=; b=E7IKjRhvuOH/kS7u0RCjq1AuXIxiAWjCexqa2MFqkDc/aVZ1KToitktCq2KtMlD+gh kwI/kxgPXHo8KHoB0T1wksOj8Irm0IDFNCktUcjJcfrsmX4Yup3IZWo17Ye1hj4A7VbF ccDn1oZEYDU8BX5QXgIK/R5ogELkRm6x8dod6vdNt4QErqnq1oAngReOUQIJHj/5XqAS ErFExrHHhPKnJ5kNvoG5QzCeB1zE44ZJq54t1I/shwxFY7YVZKrZ4CjJPUxqZwp+PDYb /rH8XEl16P4OHqsBOJBv72iFyNJ5oZ8F1O52WYjLk5qoIhl/eKwFyUENEAjULbhQzY0/ 9lWg== X-Gm-Message-State: ALyK8tK7nWk20Yd3X3Ab9tv2hxZwiHUW9CZv7Li8aZx5fmVxnMEUS6oIYbzATzers3usUmJ7sk2HaPUplxKGU+qw MIME-Version: 1.0 X-Received: by 10.157.58.3 with SMTP id j3mr7451680otc.139.1465194265174; Sun, 05 Jun 2016 23:24:25 -0700 (PDT) Received: by 10.157.1.100 with HTTP; Sun, 5 Jun 2016 23:24:25 -0700 (PDT) In-Reply-To: References: Date: Sun, 5 Jun 2016 23:24:25 -0700 Message-ID: Subject: Re: [DISCUSS] KIP-62: Allow consumer to send heartbeats from a background thread From: Henry Cai To: dev@kafka.apache.org Content-Type: multipart/alternative; boundary=001a1147333ab405110534961fe5 archived-at: Mon, 06 Jun 2016 06:24:35 -0000 --001a1147333ab405110534961fe5 Content-Type: text/plain; charset=UTF-8 I have a question on the KIP on long stall during ProcessorStateManager.restoreActiveState(), this can be a long stall when we need to rebuild the RocksDB state on a new node. 1. Is restoreActiveState() considered as post rebalance since this is invoked on application rebalance listener? 2. When the node A was spending long time rebuilding the state in restoreActiveState() from the previous rebalance, a new node (node B) send a new JoinGroup request to the co-ordinator, how long should the coordinator wait for node A to finish the restoreActiveState from the previous rebalance, the restoreActiveState can take more than 10 minutes for a big state. On Sun, Jun 5, 2016 at 10:46 PM, Becket Qin wrote: > Hi Jason, > > Thanks for this very useful KIP. In general I am with Guozhang on the > purpose of of the three timeouts. > 1) session timeout for consumer liveness, > 2) process timeout (or maybe we should rename it to max.poll.interval.ms) > for application liveness, > 3) rebalance timeout for faster rebalance in some failure cases. > > It seems the current discussion is mainly about whether we need 3) as a > separate timeout or not. The current KIP proposal is to combine 2) and 3), > i.e. just use process timeout as rebalance timeout. That means we need to > either increase rebalance timeout out to let it adapt to process timeout, > or the reverse. It would be helpful to understand the impact of these two > cases. Here are my two cents. > > For users who are consuming data from Kafka, usually they either care about > throughput or care about latency. > > If users care about the latency, they would probably care more about > average latency instead of 99.99 percentile latency which can be affected > by many other more common reasons other than consumer failure. Because all > the timeout we are discussing here only have impact on the 99.99 percentile > latency, I don't think it would really make a difference for latency > sensitive users. > > The majority of the use cases for Kafka Connect and Mirror Maker are > throughput sensitive. Ewen raised a good example where Kafka Connect needs > to process the previous data on rebalance therefore requires a higher > rebalance timeout than process timeout. This is essentially the same in > Mirror Maker, where each rebalance needs to flush all the messages in the > accumulator in the producer. That could take some time depending on how > many messages are there. In this case, we may need to increase the process > timeout to make it the same as rebalance timeout. But this is probably > fine. The downside of increasing process timeout is a longer detection time > of a consumer failure. Detecting a consumer failure a little later only > has limited impact because the rest of the consumers in the same group are > still working fine. So the total throughput is unlikely to drop > significantly. As long as the rebalance is not taking longer it should be > fine. The reason we care more about how fast rebalance can finish is > because during rebalance no consumer in the group is consuming, i.e. > throughput is zero. So we want to make the rebalance finish as quickly as > possible. > > Compare with increasing process timeout to rebalance timeout, it seems a > more common case where user wants a longer process timeout, but smaller > rebalance timeout. I am more worried about this case where we have to > shoehorn the rebalance timeout into process timeout. For users care about > throughput, that might cause the rebalance to take unnecessarily longer. > Admittedly this only has impact when a consumer had problem during > rebalance, but depending on how long the process timeout was set, the > rebalance could potentially take forever like Guozhang mentioned. > > I agree with Guozhang that we can start with 1) and 2) and add 3) later if > needed. But adding rebalance timeout is more involved than just adding a > configuration. That also means the rebalance has to be done in the > background heartbeat thread. Hence we have to synchronize rebalance and > consumer.poll() like we did in old consumer. Otherwise user may lose > messages if auto commit is enabled, or the manual commit might fail after a > consumer.poll() because the partitions might have been reassigned. So > having a separate rebalance timeout also potentially means a big change to > the users as well. > > Thanks, > > Jiangjie (Becket) Qin > > > > On Fri, Jun 3, 2016 at 11:45 AM, Jason Gustafson > wrote: > > > Hey Ewen, > > > > I confess your comments caught me off guard. It never occurred to me that > > anyone would ask for a rebalance timeout so that it could be set _larger_ > > than the process timeout. Even with buffered or batch processing, I would > > usually expect flushing before a rebalance to take no more time than a > > periodic flush. Otherwise, I'd probably try to see if there was some > > workload I could push into periodic flushes so that rebalances could > > complete faster. But supposing this isn't possible or practical in some > > cases, I'm wondering how limiting it would be in practice to have only > the > > one timeout in this case? I'm a little reluctant to add the additional > > timeout since I think most users would not have a strong need to keep a > > tight bound on normal processing time. (I'm also reminded that Jay > > mentioned he might have to dock everyone's pay 5% for each new timeout we > > introduce ;-) > > > > Thanks, > > Jason > > > > > > > > > > On Thu, Jun 2, 2016 at 7:30 PM, Guozhang Wang > wrote: > > > > > Hi Ewen, > > > > > > I think you are right, the rebalance process could potentially involve > > all > > > the delayed compute / IO. More specifically, this is what I think of > the > > > rebalance process: > > > > > > 1. Coordinator decides to rebalance, start ticking based on rebalance > > time > > > out. > > > 2. Consumer realize rebalance needed when calling poll(); trigger > > > onPartitionRevoked(). > > > 3. Consumer sends JoinGroupRequest; > > > 4. Coordinator send JoinGroupResponse; start ticking on the leader. > > > 5. Leader compute and send SyncGroupRequest > > > 6. Coordinator send SyncGroupResponse; start ticking on session > timeout. > > > 7. Consumer get new assignment; trigger onPartitionAssigned(). > > > > > > In the above process: delayed compute / IO is usually done at step 2); > > > workload initialization is usually done in step 7); and some admin work > > > (like in Kafka Streams) are likely to be done in step 5). As in the > > current > > > KIP proposal the rebalance timeout on the coordinator start ticking on > 1) > > > on everyone in the group, and stop ticking on 3); it start ticking on > > > leader again on step 4), and stop upon step 5). In this case the > delayed > > > compute / IO contained in step 2) is covered by this rebalance timeout. > > > > > > That being said, I think for "worst case", the time of processing a > > single > > > record would still be similar to rebalancing, since both of which could > > > result in completing all delayed compute / IO so far. And since > > "processing > > > timeout" is used to cover the worst case, it should be still OK? > > > > > > > > > Guozhang > > > > > > > > > > > > > > > On Thu, Jun 2, 2016 at 5:55 PM, Ewen Cheslack-Postava < > ewen@confluent.io > > > > > > wrote: > > > > > > > Jason, > > > > > > > > I've been thinking about this more in terms of something like > Connect. > > I > > > > think the rebalance timeout may be a bit different from the process > > > > timeout, and even the process timeout is a bit of a misnomer. > > > > > > > > We sort of talk about the process timeout as if it can be an > indicator > > of > > > > maximum processing time for a record/batch. This makes sense for a > case > > > of > > > > a data-dependent load (i.e. you can only load some data from slow > > storage > > > > after seeing some data) where that load might be very large compared > to > > > > normal processing time. It also makes sense if you have auto commit > > > enabled > > > > because you need to be completely finished processing the data before > > > > calling poll() again, so that time before you call another consumer > API > > > > actually reflects processing time. > > > > > > > > It might makes less sense in cases like streams (or any other app) > that > > > > batch writes to disk, or connectors that "process" a message by > > enqueuing > > > > the data, but won't commit offsets until data is flushed, possibly > > during > > > > some other, much later iteration of processing. In this case I think > > > > processing time and rebalance time could potentially differ > > > significantly. > > > > During normal processing, you can potentially pipeline quite a bit, > > > > buffering up changes, flushing as needed, but then only committing > once > > > > flushing is complete. But rebalancing is different then -- you *must* > > > > finish flushing all the data or manually choose to discard the data > > > > (presumably by doing something like watching for the process timeout > > you > > > > set and bailing early, only committing the offsets for data you've > > > > flushed). If you have lots of data built up, the cost for rebalancing > > > could > > > > be a *lot* higher than the maximum time you would otherwise see > between > > > > calls to consumer APIs to indicate processing progress. > > > > > > > > The thing that makes these cases different is that processing isn't > > > > actually tied to calls to the consumer API. You can queue up / > > pipeline / > > > > defer some of the work. (By the way, this is currently a limitation > of > > > sink > > > > connectors that I'm not thrilled about -- offset commit requires a > full > > > > flush, whereas some coordination with the sink connector to not > > require a > > > > full flush except on rebalances would be much nicer, albeit more > > > difficult > > > > for sink connectors to implement.) > > > > > > > > -Ewen > > > > > > > > > > > > > > > > On Thu, Jun 2, 2016 at 5:14 PM, Jason Gustafson > > > > wrote: > > > > > > > > > Hey Guozhang, > > > > > > > > > > I'm actually not too concerned about the time spent in the > rebalance > > > > > callback specifically. Both it and regular processing time in the > > poll > > > > loop > > > > > will delay the rebalance and keep joined consumers idle. However, > if > > we > > > > > expose the rebalance timeout, then it would give users the option > to > > > > > effective disable the process timeout while still keeping a maximum > > > bound > > > > > on the rebalance time. If the consumer cannot complete its > processing > > > > fast > > > > > enough and rejoin, then it would be evicted. This provides > something > > > like > > > > > (2) since the other consumers in the group would be able to > complete > > > the > > > > > rebalance and resume work while the evicted consumer would have to > > > > rollback > > > > > progress. This is not too different from rebalancing in the > > background > > > > > which also typically would cause commit failure and rollback > (though > > at > > > > > least the consumer stays in the group). > > > > > > > > > > Now that I'm thinking about it more, I'm not sure this would be a > > great > > > > > facility to depend on in practice. It might be OK if just one or > two > > of > > > > the > > > > > consumers fall out of the group during the rebalance, but if half > the > > > > group > > > > > is regularly getting evicted, it would be a problem. So even if we > > > expose > > > > > the rebalance timeout, the user is still going to have to set it > with > > > > some > > > > > idea in mind about how long processing should take. > > > > > > > > > > Thanks, > > > > > Jason > > > > > > > > > > On Thu, Jun 2, 2016 at 2:46 PM, Guozhang Wang > > > > wrote: > > > > > > > > > > > Hi Jason, > > > > > > > > > > > > With the current usage pattern of: > > > > > > > > > > > > while(..) { > > > > > > > > > > > > consumer.poll(/* where rebalance happens */) > > > > > > > > > > > > // process messages > > > > > > } > > > > > > > > > > > > ---------- > > > > > > > > > > > > And since rebalance is till on the caller thread, not the > > background > > > > > > thread, if coordinator decides to rebalance while user thread is > > > still > > > > on > > > > > > processing messages, there is no options but we are forced to go > > with > > > > 1) > > > > > > right? I think the your / Onur's point here, which I agree, is > that > > > by > > > > > > reusing process timeout as rebalance timeout, if the rebalance > > > callback > > > > > > could take longer time than processing a batch, users need to set > > the > > > > > > timeout value to the higher of the two, hence the callback > latency, > > > > which > > > > > > will make detection of processing stallness less effective, > right? > > > > > > > > > > > > As I mentioned in my previous email, I feel that this case of > > > > "callback > > > > > > function time taking loner than processing a batch" would not be > > > > frequent > > > > > > in practice, and the processing timeout would usually be a good > > > higher > > > > > > bound on the callback function latency. If that is true, I'd > > suggest > > > we > > > > > > keep the current proposal and not add a third timeout config for > > > > covering > > > > > > this case. > > > > > > > > > > > > > > > > > > Guozhang > > > > > > > > > > > > > > > > > > On Thu, Jun 2, 2016 at 10:40 AM, Jason Gustafson < > > jason@confluent.io > > > > > > > > > > wrote: > > > > > > > > > > > > > Hey Guozhang, > > > > > > > > > > > > > > I think the problem is that users may not want to sacrifice > > > rebalance > > > > > > > latency because of uncertainty around processing time. As soon > > as a > > > > > > > rebalance begins, there are basically two choices: > > > > > > > > > > > > > > 1. Block the rebalance until all consumers have finished their > > > > current > > > > > > > processing. > > > > > > > 2. Let all consumers rebalance and "rollback" any processing > that > > > > could > > > > > > not > > > > > > > be committed before the rebalance completes. > > > > > > > > > > > > > > If you choose option (1), then you have an incentive to keep a > > > > > relatively > > > > > > > tight bound on process.timeout.ms in order to reduce the > > > worst-case > > > > > idle > > > > > > > time during a rebalance. But if you fail to set it high enough, > > > then > > > > > > you'll > > > > > > > get spurious rebalances during normal processing. I think Onur > is > > > > > saying > > > > > > > that this still sort of sucks for users. On the other hand, if > > (2) > > > is > > > > > > > acceptable, then users will have more freedom to err on the > high > > > side > > > > > > when > > > > > > > setting process.timeout.ms, or even disable it entirely. They > > will > > > > > have > > > > > > to > > > > > > > deal with rolling back any progress which cannot be committed > > after > > > > the > > > > > > > rebalance completes, but maybe this is less of a problem for > some > > > > > users? > > > > > > > > > > > > > > Thanks, > > > > > > > Jason > > > > > > > > > > > > > > > > > > > > > > > > > > > > On Wed, Jun 1, 2016 at 10:23 PM, Guozhang Wang < > > wangguoz@gmail.com > > > > > > > > > > wrote: > > > > > > > > > > > > > > > Hi Onur, Jason: > > > > > > > > > > > > > > > > Here are some thoughts about reusing process timeout as > > > server-side > > > > > > > > rebalance timeout: First of all, my understanding is that > > > > > > > > > > > > > > > > 1) session timeout is for detecting consumer crash / hard > > > failures > > > > > (in > > > > > > > this > > > > > > > > case the heartbeat thread will be dead as well, hence > > coordinator > > > > > > > realized > > > > > > > > within session timeout value). > > > > > > > > > > > > > > > > 2) process timeout is for checking liveness of the user > thread > > > that > > > > > > calls > > > > > > > > the consumer as well as does the processing: when no consumer > > > calls > > > > > are > > > > > > > > made within the process timeout, heartbeat thread stop > working > > > and > > > > > > hence > > > > > > > it > > > > > > > > will be detected by coordinator. > > > > > > > > > > > > > > > > 3) a potential server-side rebalance timeout would be used to > > > > detect > > > > > > > > consumer liveness during the rebalance period, in which the > > user > > > > > thread > > > > > > > is > > > > > > > > tied with the "poll" call and also the callback function, to > > > > prevent > > > > > a > > > > > > > slow > > > > > > > > / stalled consumer in their rebalance callback to cause the > > > > rebalance > > > > > > > > taking forever. > > > > > > > > > > > > > > > > I think we generally have two cases in practice regarding 3) > > > above: > > > > > > user > > > > > > > > either does almost nothing and hence should never be stalled > > > > (unless > > > > > > > there > > > > > > > > is a long GC), or they do various external IOs for > maintaining > > > > their > > > > > > own > > > > > > > > states, for example, which could be taking long or even cause > > the > > > > > > thread > > > > > > > to > > > > > > > > stall. We do not need to worry too much about the former > case, > > > and > > > > as > > > > > > for > > > > > > > > latter case if the process timeout value should usually be a > > good > > > > > > higher > > > > > > > > bound on the rebalance latency. > > > > > > > > > > > > > > > > That being said, if we observe that there is indeed a common > > > usage > > > > > > where > > > > > > > 2) > > > > > > > > and 3) would require very different timeout values which > > > overwhelms > > > > > the > > > > > > > > complexity of three timeout values, we can consider adding a > > > third > > > > > one > > > > > > > > then: it is easier to add more configs later. > > > > > > > > > > > > > > > > > > > > > > > > What do you think? > > > > > > > > > > > > > > > > Guozhang > > > > > > > > > > > > > > > > > > > > > > > > On Tue, May 31, 2016 at 2:35 PM, Jason Gustafson < > > > > jason@confluent.io > > > > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > Hey Onur, > > > > > > > > > > > > > > > > > > Thanks for the detailed response. I think the problem of > > > > > controlling > > > > > > > > > rebalance times is the main (known) gap in the proposal as > it > > > > > stands. > > > > > > > > > > > > > > > > > > This burden goes away if you loosen the liveness property > by > > > > > having a > > > > > > > > > > required rebalance time and optional processing time > where > > > > > > rebalance > > > > > > > > > > happens in the background thread as stated in the KIP. > > > > > > > > > > > > > > > > > > > > > > > > > > > Just to clarify, the current KIP only allows rebalances to > > > > complete > > > > > > in > > > > > > > > the > > > > > > > > > foreground. When I suggested above in reply to Grant was > that > > > we > > > > > > could > > > > > > > > add > > > > > > > > > a separate rebalance timeout setting, the behavior I had in > > > mind > > > > > was > > > > > > to > > > > > > > > let > > > > > > > > > the consumer fall out of the group if the timeout is > reached > > > > while > > > > > > the > > > > > > > > > consumer is still processing. I was specifically trying to > > > avoid > > > > > > moving > > > > > > > > the > > > > > > > > > rebalance to the background thread since this significantly > > > > > increases > > > > > > > the > > > > > > > > > complexity of the implementation. We'd also have to think > > about > > > > > > > > > compatibility a bit more. For example, what are the > > > implications > > > > of > > > > > > > > having > > > > > > > > > the rebalance listener execute in a separate thread? > > > > > > > > > > > > > > > > > > Putting that issue aside, I think we need to convince > > ourselves > > > > > that > > > > > > a > > > > > > > > > separate rebalance timeout is really necessary since every > > new > > > > > > timeout > > > > > > > > adds > > > > > > > > > some conceptual noise which all users will see. My thought > in > > > > this > > > > > > KIP > > > > > > > > was > > > > > > > > > that users who didn't want the burden of tuning the process > > > > timeout > > > > > > > could > > > > > > > > > use a relatively large value without a major impact because > > > group > > > > > > > > > rebalances themselves will typically be infrequent. The > main > > > > > concern > > > > > > is > > > > > > > > for > > > > > > > > > users who have highly variant processing times and want to > > > > ensure a > > > > > > > tight > > > > > > > > > bound on rebalance times (even if it means having to > discard > > > some > > > > > > > > > processing that cannot be completed before the rebalance > > > > finishes). > > > > > > > These > > > > > > > > > users will be left trying to tune process.timeout.ms and > > > > > > > > max.poll.records, > > > > > > > > > which is basically the same position they are currently in. > > The > > > > > > problem > > > > > > > > is > > > > > > > > > I don't know how common this case is, so I'm not sure how > it > > > > weighs > > > > > > > > against > > > > > > > > > the cost of having an additional timeout that needs to be > > > > > explained. > > > > > > We > > > > > > > > can > > > > > > > > > always add the rebalance timeout later, but if it will be > > tough > > > > to > > > > > > > remove > > > > > > > > > once it's there. All the same, I'm not that keen on another > > > > > iteration > > > > > > > of > > > > > > > > > this problem, so if we believe this use case is common > > enough, > > > > then > > > > > > > maybe > > > > > > > > > we should add it now. > > > > > > > > > > > > > > > > > > Thanks, > > > > > > > > > Jason > > > > > > > > > > > > > > > > > > > > > > > > > > > On Sat, May 28, 2016 at 3:10 AM, Onur Karaman < > > > > > > > > > onurkaraman.apache@gmail.com> > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > Thanks for the KIP writeup, Jason. > > > > > > > > > > > > > > > > > > > > Before anything else, I just wanted to point out that > it's > > > > worth > > > > > > > > > mentioning > > > > > > > > > > the "heartbeat.interval.ms" consumer config in the KIP > for > > > > > > > > completeness. > > > > > > > > > > Today this config only starts to kick in if poll is > called > > > > > > frequently > > > > > > > > > > enough. A separate heartbeat thread should make this > config > > > > > behave > > > > > > > more > > > > > > > > > > like what people would expect: a separate thread sending > > > > > heartbeats > > > > > > > at > > > > > > > > > the > > > > > > > > > > configured interval. > > > > > > > > > > > > > > > > > > > > With this KIP, the relevant configs become: > > > > > > > > > > "max.poll.records" - already exists > > > > > > > > > > "session.timeout.ms" - already exists > > > > > > > > > > "heartbeat.interval.ms" - already exists > > > > > > > > > > "process.timeout.ms" - new > > > > > > > > > > > > > > > > > > > > After reading the KIP several times, I think it would be > > > > helpful > > > > > to > > > > > > > be > > > > > > > > > more > > > > > > > > > > explicit in the desired outcome. Is it trying to make > > faster > > > > > > > > > > best/average/worst case rebalance times? Is it trying to > > make > > > > the > > > > > > > > clients > > > > > > > > > > need less configuration tuning? > > > > > > > > > > > > > > > > > > > > Also it seems that brokers probably still want to enforce > > > > minimum > > > > > > and > > > > > > > > > > maximum rebalance timeouts just as with the minimum and > > > maximum > > > > > > > session > > > > > > > > > > timeouts so DelayedJoins don't stay in purgatory > > > indefinitely. > > > > So > > > > > > > we'd > > > > > > > > > add > > > > > > > > > > new "group.min.rebalance.timeout.ms" and " > > > > > > > > group.max.rebalance.timeout.ms > > > > > > > > > " > > > > > > > > > > broker configs which again might need to be brought up in > > the > > > > > KIP. > > > > > > > > Let's > > > > > > > > > > say we add these bounds. A side-effect of having > > broker-side > > > > > bounds > > > > > > > on > > > > > > > > > > rebalance timeouts in combination with Java clients that > > > makes > > > > > > > process > > > > > > > > > > timeouts the same as rebalance timeouts is that the > broker > > > > > > > effectively > > > > > > > > > > dictates the max processing time allowed between poll > > calls. > > > > This > > > > > > > > gotcha > > > > > > > > > > exists right now with today's broker-side bounds on > session > > > > > > timeouts. > > > > > > > > So > > > > > > > > > > I'm not really convinced that the proposal gets rid of > this > > > > > > > > complication > > > > > > > > > > mentioned in the KIP. > > > > > > > > > > > > > > > > > > > > I think the main question to ask is: does the KIP > actually > > > > make a > > > > > > > > > > difference? > > > > > > > > > > > > > > > > > > > > It looks like this KIP improves rebalance times > > specifically > > > > when > > > > > > the > > > > > > > > > > client currently has processing times large enough to > force > > > > > larger > > > > > > > > > session > > > > > > > > > > timeouts and heartbeat intervals to not be honored. > > > Separating > > > > > > > session > > > > > > > > > > timeouts from processing time means clients can keep > their > > " > > > > > > > > > > session.timeout.ms" low so the coordinator can quickly > > > detect > > > > > > > process > > > > > > > > > > failure, and honoring a low "heartbeat.interval.ms" on > the > > > > > > separate > > > > > > > > > > heartbeat thread means clients will be quickly notified > of > > > > group > > > > > > > > > membership > > > > > > > > > > and subscription changes - all without placing difficult > > > > > > expectations > > > > > > > > on > > > > > > > > > > processing time. But even so, rebalancing through the > > calling > > > > > > thread > > > > > > > > > means > > > > > > > > > > the slowest processing client in the group will still be > > the > > > > rate > > > > > > > > > limiting > > > > > > > > > > step when looking at rebalance times. > > > > > > > > > > > > > > > > > > > > From a usability perspective, the burden still seems like > > it > > > > will > > > > > > be > > > > > > > > > tuning > > > > > > > > > > the processing time to keep the "progress liveness" happy > > > > during > > > > > > > > > rebalances > > > > > > > > > > while still having reasonable upper bounds on rebalance > > > times. > > > > It > > > > > > > still > > > > > > > > > > looks like users have to do almost the exact same tricks > as > > > > today > > > > > > > when > > > > > > > > > the > > > > > > > > > > group membership changes due slow processing times even > > > though > > > > > all > > > > > > > the > > > > > > > > > > consumers are alive and the topics haven't change: > > > > > > > > > > 1. Increase the rebalance timeout to give more time for > > > record > > > > > > > > processing > > > > > > > > > > (the difference compared to today is that we bump the > > > rebalance > > > > > > > timeout > > > > > > > > > > instead of session timeout). > > > > > > > > > > 2. Reduce the number of records handled on each iteration > > > with > > > > > > > > > > max.poll.records. > > > > > > > > > > > > > > > > > > > > This burden goes away if you loosen the liveness property > > by > > > > > > having a > > > > > > > > > > required rebalance time and optional processing time > where > > > > > > rebalance > > > > > > > > > > happens in the background thread as stated in the KIP. > > > > > > > > > > > > > > > > > > > > On Thu, May 26, 2016 at 12:40 PM, Jason Gustafson < > > > > > > > jason@confluent.io> > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > Hey Grant, > > > > > > > > > > > > > > > > > > > > > > Thanks for the feedback. I'm definitely open to > including > > > > > > > heartbeat() > > > > > > > > > in > > > > > > > > > > > this KIP. One thing we should be clear about is what > the > > > > > behavior > > > > > > > of > > > > > > > > > > > heartbeat() should be when the group begins > rebalancing. > > I > > > > > think > > > > > > > > there > > > > > > > > > > are > > > > > > > > > > > basically two options: > > > > > > > > > > > > > > > > > > > > > > 1. heartbeat() simply keeps heartbeating even if the > > group > > > > has > > > > > > > > started > > > > > > > > > > > rebalancing. > > > > > > > > > > > 2. heartbeat() completes the rebalance itself. > > > > > > > > > > > > > > > > > > > > > > With the first option, when processing takes longer > than > > > the > > > > > > > > rebalance > > > > > > > > > > > timeout, the member will fall out of the group which > will > > > > cause > > > > > > an > > > > > > > > > offset > > > > > > > > > > > commit failure when it finally finishes. However, if > > > > processing > > > > > > > > > finishes > > > > > > > > > > > before the rebalance completes, then offsets can still > be > > > > > > > committed. > > > > > > > > On > > > > > > > > > > the > > > > > > > > > > > other hand, if heartbeat() completes the rebalance > > itself, > > > > then > > > > > > > > you'll > > > > > > > > > > > definitely see the offset commit failure for any > records > > > > being > > > > > > > > > processed. > > > > > > > > > > > So the first option is sort of biased toward processing > > > > > > completion > > > > > > > > > while > > > > > > > > > > > the latter is biased toward rebalance completion. > > > > > > > > > > > > > > > > > > > > > > I'm definitely not a fan of second option since it > takes > > > away > > > > > the > > > > > > > > > choice > > > > > > > > > > to > > > > > > > > > > > finish processing before rejoining. However, I do see > > some > > > > > > benefit > > > > > > > in > > > > > > > > > the > > > > > > > > > > > first option if the user wants to keep rebalance time > low > > > and > > > > > > > doesn't > > > > > > > > > > mind > > > > > > > > > > > being kicked out of the group if processing takes > longer > > > > > during a > > > > > > > > > > > rebalance. This may be a reasonable tradeoff since > > consumer > > > > > > groups > > > > > > > > are > > > > > > > > > > > presumed to be stable most of the time. A better option > > in > > > > that > > > > > > > case > > > > > > > > > > might > > > > > > > > > > > be to expose the rebalance timeout to the user directly > > > since > > > > > it > > > > > > > > would > > > > > > > > > > > allow the user to use an essentially unbounded > > > > > > process.timeout.ms > > > > > > > > for > > > > > > > > > > > highly variant processing while still keeping rebalance > > > time > > > > > > > limited. > > > > > > > > > Of > > > > > > > > > > > course, it would be another timeout for the user to > > > > > understand... > > > > > > > > > > > > > > > > > > > > > > Thanks, > > > > > > > > > > > Jason > > > > > > > > > > > > > > > > > > > > > > On Thu, May 26, 2016 at 8:19 AM, Grant Henke < > > > > > > ghenke@cloudera.com> > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > > > Hi Jason, > > > > > > > > > > > > > > > > > > > > > > > > Thanks for writing up a proposal (and a thorough > one)! > > > This > > > > > is > > > > > > > > > > something > > > > > > > > > > > > that I had been thinking about this week too as I > have > > > run > > > > > into > > > > > > > it > > > > > > > > > more > > > > > > > > > > > > than a handful of times now. > > > > > > > > > > > > > > > > > > > > > > > > I like the idea of having a larger processing > timeout, > > > that > > > > > > > timeout > > > > > > > > > in > > > > > > > > > > > > unison with max.poll.records should in many cases > > > provide a > > > > > > > > > reasonable > > > > > > > > > > > > assurance that the consumer will stay alive. > > > > > > > > > > > > > > > > > > > > > > > > In rejected alternatives "Add a separate API the user > > can > > > > > call > > > > > > to > > > > > > > > > > > indicate > > > > > > > > > > > > liveness" is listed. I think a heartbeat api could be > > > added > > > > > > along > > > > > > > > > with > > > > > > > > > > > > these new timeout configurations and used for > > "advanced" > > > > use > > > > > > > cases > > > > > > > > > > where > > > > > > > > > > > > the processing time could be highly variant and less > > > > > > > predictable. I > > > > > > > > > > > think a > > > > > > > > > > > > place where we might use the heartbeat api in Kafka > is > > > > > > > MirrorMaker. > > > > > > > > > > > > > > > > > > > > > > > > Today, I have seen people trying to find ways to > > leverage > > > > the > > > > > > > > > existing > > > > > > > > > > > api > > > > > > > > > > > > to "force" heartbeats by: > > > > > > > > > > > > > > > > > > > > > > > > 1. Calling poll to get the batch of records to > process > > > > > > > > > > > > 2. Call pause on all partitions > > > > > > > > > > > > 3. Process the record batch > > > > > > > > > > > > 3a. While processing periodically call poll (which is > > > > > > essentially > > > > > > > > > just > > > > > > > > > > > > heartbeat since it returns no records and is paused) > > > > > > > > > > > > 4. Commit offsets and un-pause > > > > > > > > > > > > 5. Repeat from 1 > > > > > > > > > > > > > > > > > > > > > > > > Thanks, > > > > > > > > > > > > Grant > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > On Wed, May 25, 2016 at 6:32 PM, Jason Gustafson < > > > > > > > > jason@confluent.io > > > > > > > > > > > > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > > > > > Hi All, > > > > > > > > > > > > > > > > > > > > > > > > > > One of the persistent problems we see with the new > > > > consumer > > > > > > is > > > > > > > > the > > > > > > > > > > use > > > > > > > > > > > of > > > > > > > > > > > > > the session timeout in order to ensure progress. > > > Whenever > > > > > > there > > > > > > > > is > > > > > > > > > a > > > > > > > > > > > > delay > > > > > > > > > > > > > in message processing which exceeds the session > > > timeout, > > > > no > > > > > > > > > > heartbeats > > > > > > > > > > > > can > > > > > > > > > > > > > be sent and the consumer is removed from the group. > > We > > > > seem > > > > > > to > > > > > > > > hit > > > > > > > > > > this > > > > > > > > > > > > > problem everywhere the consumer is used (including > > > Kafka > > > > > > > Connect > > > > > > > > > and > > > > > > > > > > > > Kafka > > > > > > > > > > > > > Streams) and we don't always have a great solution. > > > I've > > > > > > > written > > > > > > > > a > > > > > > > > > > KIP > > > > > > > > > > > to > > > > > > > > > > > > > address this problem here: > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-62%3A+Allow+consumer+to+send+heartbeats+from+a+background+thread > > > > > > > > > > > > > . > > > > > > > > > > > > > Have a look and let me know what you think. > > > > > > > > > > > > > > > > > > > > > > > > > > Thanks, > > > > > > > > > > > > > Jason > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > -- > > > > > > > > > > > > Grant Henke > > > > > > > > > > > > Software Engineer | Cloudera > > > > > > > > > > > > grant@cloudera.com | twitter.com/gchenke | > > > > > > > > > linkedin.com/in/granthenke > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > -- > > > > > > > > -- Guozhang > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > -- > > > > > > -- Guozhang > > > > > > > > > > > > > > > > > > > > > > > > > > > -- > > > > Thanks, > > > > Ewen > > > > > > > > > > > > > > > > -- > > > -- Guozhang > > > > > > --001a1147333ab405110534961fe5--