Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 7F438200BEF for ; Wed, 4 Jan 2017 18:35:44 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id 7EB6C160B3A; Wed, 4 Jan 2017 17:35:44 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 74C36160B21 for ; Wed, 4 Jan 2017 18:35:43 +0100 (CET) Received: (qmail 73785 invoked by uid 500); 4 Jan 2017 17:35:42 -0000 Mailing-List: contact dev-help@kafka.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@kafka.apache.org Delivered-To: mailing list dev@kafka.apache.org Received: (qmail 73773 invoked by uid 99); 4 Jan 2017 17:35:42 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd1-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 04 Jan 2017 17:35:42 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd1-us-west.apache.org (ASF Mail Server at spamd1-us-west.apache.org) with ESMTP id B810BC26C6 for ; Wed, 4 Jan 2017 17:35:41 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd1-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 2.498 X-Spam-Level: ** X-Spam-Status: No, score=2.498 tagged_above=-999 required=6.31 tests=[DKIM_SIGNED=0.1, DKIM_VALID=-0.1, HTML_MESSAGE=2, RCVD_IN_DNSWL_NONE=-0.0001, RCVD_IN_MSPIKE_H2=-0.001, RCVD_IN_SORBS_SPAM=0.5, SPF_PASS=-0.001] autolearn=disabled Authentication-Results: spamd1-us-west.apache.org (amavisd-new); dkim=pass (2048-bit key) header.d=confluent-io.20150623.gappssmtp.com Received: from mx1-lw-eu.apache.org ([10.40.0.8]) by localhost (spamd1-us-west.apache.org [10.40.0.7]) (amavisd-new, port 10024) with ESMTP id 8TM1I4Dq9qrz for ; Wed, 4 Jan 2017 17:35:39 +0000 (UTC) Received: from mail-vk0-f48.google.com (mail-vk0-f48.google.com [209.85.213.48]) by mx1-lw-eu.apache.org (ASF Mail Server at mx1-lw-eu.apache.org) with ESMTPS id 0C1D25F640 for ; Wed, 4 Jan 2017 17:35:39 +0000 (UTC) Received: by mail-vk0-f48.google.com with SMTP id 137so290302573vkl.0 for ; Wed, 04 Jan 2017 09:35:38 -0800 (PST) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=confluent-io.20150623.gappssmtp.com; s=20150623; h=mime-version:in-reply-to:references:from:date:message-id:subject:to; bh=i6COJ1/8731bp0sWsnz6xnRU9NfcrvgyJWoT7XbZT9o=; b=mdTaA/YF4n5RNFgZ6j7xpeW3mK5frmco7Jmj5lZrqARzAhQlz4g5YvY0eUCzfKuPYO zDUKzPjDfS+Nkbnl5O4lPi3Kqze8kZ3iiG+u39DjI1QXtj5tgq5TsHCC95XGkkiT9nqC dAZKBcY6niixeq73NqVXcr0Gm8DkUj4ZW11kz6k+LJkbkTnWkH6rHjkyT9VXQ0pDwEp6 /Zyo1qHjWkywhH8S6WsiY94gEp6g2AkZo2RI1f0+029vrKKhIf73iXPTsITgiQ+7smAc lyr3/N3XEAZ7xDtzB9ImGXQS4js+lE7q5ZijxU3us7RTxvrEgcaE/3B5ymdK1Ls6ZHQZ D5kg== X-Google-DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=1e100.net; s=20161025; h=x-gm-message-state:mime-version:in-reply-to:references:from:date :message-id:subject:to; bh=i6COJ1/8731bp0sWsnz6xnRU9NfcrvgyJWoT7XbZT9o=; b=RXL5VZMWIWpuZtsgSWcbEt9MivtlCuv3XKmqm88GZosdVmrnJbZAXkzhlF6ekINBvZ OH0wxEFIpEITTpwoI7CcrL/pOOasFrvvznz0zaf2uOHmemu//gud11aJAJuCL4gXgeQA MS9T2SuSc2DHCQH6TP5j0QaHUwHpHUMfqt62TensXbTuy8sRPfbLnY8cC5VjGIAfuybk 4e7PN7GKPMsiUP3/xUlbRaruVmFu1/uNOiqCf7j3PRN5Gy9k6dbAvpd0BC62HHSjuB3h gqs5WV2cEOx4ahbWMhCVjY478tlMX59JrnMxyI8Kio2BG++BHZ1nnPvCDHkWjFGO/yoY mtdQ== X-Gm-Message-State: AIkVDXIT1F4V+fON17b1sphB3Ubc4NFyJNKqa8QfoiwxnIpE01VysWP4EdEBrHQf2kvenP3CGsQ15nDyeLj3syGp X-Received: by 10.31.139.12 with SMTP id n12mr19487750vkd.100.1483551337833; Wed, 04 Jan 2017 09:35:37 -0800 (PST) MIME-Version: 1.0 Received: by 10.103.148.141 with HTTP; Wed, 4 Jan 2017 09:35:37 -0800 (PST) In-Reply-To: References: From: Jason Gustafson Date: Wed, 4 Jan 2017 09:35:37 -0800 Message-ID: Subject: Re: [DISCUSS] KIP-102 - Add close with timeout for consumers To: dev@kafka.apache.org Content-Type: multipart/alternative; boundary=001a11437e567f7d0905454836bf archived-at: Wed, 04 Jan 2017 17:35:44 -0000 --001a11437e567f7d0905454836bf Content-Type: text/plain; charset=UTF-8 Hi Rajini, Thanks for the clarification. I looked again at the patch and I see what you're saying now. I was confused because I assumed the request timeout was being enforced on the requests themselves, but it is more that the request timeout bounds the attempt to send them in addition to the time to receive a response, right? So it is possible that we timeout before even getting a chance to send the OffsetCommit (for example). I think I'd still prefer timing out quicker by default if possible. The one case where it might be worthwhile waiting longer is when there are pending offset commits sent through commitSync() or commitAsync(). But if we're not actually doing retries or coordinator rediscovery, I'm not sure the additional time helps that much. -Jason On Wed, Jan 4, 2017 at 8:27 AM, Rajini Sivaram wrote: > Hi Jason, > > Thank you for the review. > > During close(), if there is a rebalance and the coordinator has to be > rediscovered, close terminates without trying to find the coordinator. The > poll() loop within close terminates if the coordinator is not known (as it > does now) or if the timeout expires. At the moment, that timeout is a > hard-coded 5 second timeout. The PR changes that to min(closeTimeout, > requestTimeout). So even if there are pending commits, the maximum wait > will be requestTimeout in the final poll() loop of close(). > > In addition to this, before the poll loop, there is a > maybeAutoCommitOffsetsSync(). At the moment, this does not have a timeout > and can wait indefinitely. The PR introduces a timeout for this commit > invoked from close(). The timeout is min(closeTimeout, requestTimeout). > Hence the maximum timeout of (2 * requestTimeout) for any close. Have I > missed something? > > I had chosen Long.MAX_VALUE as default close timeout to be consistent with > Producer. But perhaps a lower timeout of 30 seconds is more meaningful for > Consumer since consumer typically has less to do. Even with (2 * > requestTimeout), the default would be 20 minutes, which is perhaps too high > anyway. I will update the KIP. > > > On Wed, Jan 4, 2017 at 3:16 AM, Jason Gustafson > wrote: > > > Hey Rajini, > > > > Thanks for the KIP. I had a quick look at the patch and the impact > doesn't > > seem too bad. Just wanted to clarify one point. This is from the KIP: > > > > The existing close() method without a timeout will attempt to close the > > > consumer gracefully with a timeout of Long.MAX_VALUE. Since commit and > > > leave group requests are timed out after the request timeout, the upper > > > bound will be approximately 2*request.timeout.ms (around 10 minutes by > > > default). > > > > > > I don't think this is quite right. There could be one or more pending > > OffsetCommit requests (sent using commitAsync) that we may have to await. > > We could also be in the middle of a group rebalance. The other > complication > > is what happens in the event of a request timeout. Usually the consumer > > will rediscover the coordinator. Would we do that as well in close() and > > retry any failed requests if there is time remaining, or would we just > fail > > the remaining requests and return? In any case, it may not be so easy to > > set an upper bound on the default timeout. > > > > With that in mind, I'm wondering whether waiting indefinitely should be > the > > default. In the case of the OffsetCommit before closing (when autocommit > is > > enabled) or the LeaveGroup, it's more or less OK if these requests fail. > > Maybe we should consider them best effort (as is currently done) and > wait a > > reasonable amount of time (say 30 seconds) for their completion. I'd > rather > > have "nice" behavior out of the box and let users who want indefinite > > blocking use Long.MAX_VALUE themselves. What do you think? > > > > Thanks, > > Jason > > > > On Wed, Dec 21, 2016 at 4:39 AM, Rajini Sivaram > > > wrote: > > > > > I have added some more detail to the "Proposed Changes" section. Also > > > created a preliminary PR for the JIRA ( > > > https://github.com/apache/kafka/pull/2285). > > > > > > I am using *request.timeout.ms * to bound > > > individual requests during close (the KIP does not address timeouts in > > any > > > other code path) to ensure that *close()* always completes within a > > bounded > > > time even when timeout is not specified. This is similar to the > producer > > > where requests are aborted after *request.timeout.ms > > > . *The PR contains unit and integration > tests > > > for all the close scenarios I could think of (but there could be more). > > > > > > > > > On Mon, Dec 19, 2016 at 10:32 PM, Guozhang Wang > > > wrote: > > > > > > > +1 on this idea as well. > > > > > > > > Streams has also added a similar feature itself partly because > consumer > > > > does not support it directly (other part of the reason is that like > > > > brokers, streams also have some exception handling logic which could > > lead > > > > to deadlock with careless System.exit). For consumer itself I think > the > > > > trickiness lies in the prefetching calls as well as commit / HB > > requests > > > > cleanup with the timeout, and I agree with Ewen that it's better to > be > > > > merged in the early release cycle than a last minute merge. > > > > > > > > > > > > > > > > Guozhang > > > > > > > > On Mon, Dec 19, 2016 at 4:18 AM, Rajini Sivaram < > > rajinisivaram@gmail.com > > > > > > > > wrote: > > > > > > > > > Thank you for the reviews. > > > > > > > > > > @Becket @Ewen, Agree that making all blocking calls have a timeout > > will > > > > be > > > > > trickier and hence the scope of this KIP is limited to close(). > > > > > > > > > > @Jay Yes, this should definitely go into release notes, will make > > sure > > > it > > > > > is added. I will add some integration tests with broker failures > for > > > > > testing the timeout, but they cannot completely eliminate the risk > > of a > > > > > hang. Over time, hopefully system tests will help catch most > issues. > > > > > > > > > > > > > > > On Sat, Dec 17, 2016 at 1:15 AM, Jay Kreps > wrote: > > > > > > > > > > > I think this is great. Sounds like one implication is that > existing > > > > code > > > > > > that called close() and hit the timeout would now hang > > indefinitely. > > > We > > > > > saw > > > > > > this kind of thing a lot in automated testing scenarios where > > people > > > > > don't > > > > > > correctly sequence their shutdown of client and server. I think > > this > > > is > > > > > > okay, but might be good to include in the release notes. > > > > > > > > > > > > -jay > > > > > > > > > > > > On Thu, Dec 15, 2016 at 5:32 AM, Rajini Sivaram < > > rsivaram@pivotal.io > > > > > > > > > > wrote: > > > > > > > > > > > > Hi all, > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > I have just created KIP-102 to add a new close method for > consumers > > > > with > > > > > a > > > > > > > > > > > > > > > > > > timeout parameter, making Consumer consistent with Producer: > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP- > > > > > > 102+-+Add+close+with+timeout+for+consumers > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Comments and suggestions are welcome. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Thank you... > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Regards, > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Rajini > > > > > > > > > > > > > > > > > > > > > > > > > > -- > > > > > Regards, > > > > > > > > > > Rajini > > > > > > > > > > > > > > > > > > > > > -- > > > > -- Guozhang > > > > > > > > > > --001a11437e567f7d0905454836bf--