kafka-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Rajini Sivaram <rajinisiva...@gmail.com>
Subject Re: [DISCUSS] KIP-102 - Add close with timeout for consumers
Date Wed, 04 Jan 2017 16:27:58 GMT
Hi Jason,

Thank you for the review.

During close(), if there is a rebalance and the coordinator has to be
rediscovered, close terminates without trying to find the coordinator. The
poll() loop within close terminates if the coordinator is not known (as it
does now) or if the timeout expires. At the moment, that timeout is a
hard-coded 5 second timeout. The PR changes that to min(closeTimeout,
requestTimeout). So even if there are pending commits, the maximum wait
will be requestTimeout in the final poll() loop of close().

In addition to this, before the poll loop, there is a
maybeAutoCommitOffsetsSync(). At the moment, this does not have a timeout
and can wait indefinitely. The PR introduces a timeout for this commit
invoked from close(). The timeout is min(closeTimeout, requestTimeout).
Hence the maximum timeout of (2 * requestTimeout) for any close. Have I
missed something?

I had chosen Long.MAX_VALUE as default close timeout to be consistent with
Producer. But perhaps a lower timeout of 30 seconds is more meaningful for
Consumer since consumer typically has less to do. Even with (2 *
requestTimeout), the default would be 20 minutes, which is perhaps too high
anyway. I will update the KIP.


On Wed, Jan 4, 2017 at 3:16 AM, Jason Gustafson <jason@confluent.io> wrote:

> Hey Rajini,
>
> Thanks for the KIP. I had a quick look at the patch and the impact doesn't
> seem too bad. Just wanted to clarify one point. This is from the KIP:
>
> The existing close() method without a timeout will attempt to close the
> > consumer gracefully with a timeout of Long.MAX_VALUE. Since commit and
> > leave group requests are timed out after the request timeout, the upper
> > bound will be approximately 2*request.timeout.ms (around 10 minutes by
> > default).
>
>
> I don't think this is quite right. There could be one or more pending
> OffsetCommit requests (sent using commitAsync) that we may have to await.
> We could also be in the middle of a group rebalance. The other complication
> is what happens in the event of a request timeout. Usually the consumer
> will rediscover the coordinator. Would we do that as well in close() and
> retry any failed requests if there is time remaining, or would we just fail
> the remaining requests and return? In any case, it may not be so easy to
> set an upper bound on the default timeout.
>
> With that in mind, I'm wondering whether waiting indefinitely should be the
> default. In the case of the OffsetCommit before closing (when autocommit is
> enabled) or the LeaveGroup, it's more or less OK if these requests fail.
> Maybe we should consider them best effort (as is currently done) and wait a
> reasonable amount of time (say 30 seconds) for their completion. I'd rather
> have "nice" behavior out of the box and let users who want indefinite
> blocking use Long.MAX_VALUE themselves. What do you think?
>
> Thanks,
> Jason
>
> On Wed, Dec 21, 2016 at 4:39 AM, Rajini Sivaram <rajinisivaram@gmail.com>
> wrote:
>
> > I have added some more detail to the "Proposed Changes" section. Also
> > created a preliminary PR for the JIRA (
> > https://github.com/apache/kafka/pull/2285).
> >
> > I am using *request.timeout.ms <http://request.timeout.ms>* to bound
> > individual requests during close (the KIP does not address timeouts in
> any
> > other code path) to ensure that *close()* always completes within a
> bounded
> > time even when timeout is not specified. This is similar to the producer
> > where requests are aborted after *request.timeout.ms
> > <http://request.timeout.ms>. *The PR contains unit and integration tests
> > for all the close scenarios I could think of (but there could be more).
> >
> >
> > On Mon, Dec 19, 2016 at 10:32 PM, Guozhang Wang <wangguoz@gmail.com>
> > wrote:
> >
> > > +1 on this idea as well.
> > >
> > > Streams has also added a similar feature itself partly because consumer
> > > does not support it directly (other part of the reason is that like
> > > brokers, streams also have some exception handling logic which could
> lead
> > > to deadlock with careless System.exit). For consumer itself I think the
> > > trickiness lies in the prefetching calls as well as commit / HB
> requests
> > > cleanup with the timeout, and I agree with Ewen that it's better to be
> > > merged in the early release cycle than a last minute merge.
> > >
> > >
> > >
> > > Guozhang
> > >
> > > On Mon, Dec 19, 2016 at 4:18 AM, Rajini Sivaram <
> rajinisivaram@gmail.com
> > >
> > > wrote:
> > >
> > > > Thank you for the reviews.
> > > >
> > > > @Becket @Ewen, Agree that making all blocking calls have a timeout
> will
> > > be
> > > > trickier and hence the scope of this KIP is limited to close().
> > > >
> > > > @Jay Yes, this should definitely go into release notes, will make
> sure
> > it
> > > > is added. I will add some integration tests with broker failures for
> > > > testing the timeout, but they cannot completely eliminate the risk
> of a
> > > > hang. Over time, hopefully system tests will help catch most issues.
> > > >
> > > >
> > > > On Sat, Dec 17, 2016 at 1:15 AM, Jay Kreps <jay@confluent.io> wrote:
> > > >
> > > > > I think this is great. Sounds like one implication is that existing
> > > code
> > > > > that called close() and hit the timeout would now hang
> indefinitely.
> > We
> > > > saw
> > > > > this kind of thing a lot in automated testing scenarios where
> people
> > > > don't
> > > > > correctly sequence their shutdown of client and server. I think
> this
> > is
> > > > > okay, but might be good to include in the release notes.
> > > > >
> > > > > -jay
> > > > >
> > > > > On Thu, Dec 15, 2016 at 5:32 AM, Rajini Sivaram <
> rsivaram@pivotal.io
> > >
> > > > > wrote:
> > > > >
> > > > > Hi all,
> > > > >
> > > > >
> > > > >
> > > > >
> > > > >
> > > > > I have just created KIP-102 to add a new close method for consumers
> > > with
> > > > a
> > > > >
> > > > >
> > > > > timeout parameter, making Consumer consistent with Producer:
> > > > >
> > > > >
> > > > >
> > > > >
> > > > >
> > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> > > > > 102+-+Add+close+with+timeout+for+consumers
> > > > >
> > > > >
> > > > >
> > > > >
> > > > >
> > > > > Comments and suggestions are welcome.
> > > > >
> > > > >
> > > > >
> > > > >
> > > > >
> > > > > Thank you...
> > > > >
> > > > >
> > > > >
> > > > >
> > > > >
> > > > > Regards,
> > > > >
> > > > >
> > > > >
> > > > >
> > > > >
> > > > > Rajini
> > > > >
> > > >
> > > >
> > > >
> > > > --
> > > > Regards,
> > > >
> > > > Rajini
> > > >
> > >
> > >
> > >
> > > --
> > > -- Guozhang
> > >
> >
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message