kafka-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Rajini Sivaram <rajinisiva...@gmail.com>
Subject Re: [DISCUSS] KIP-102 - Add close with timeout for consumers
Date Wed, 04 Jan 2017 19:42:16 GMT
Hi Jason,

Yes, we do potentially timeout even before sending pending commits after
the request timeout (default is > 5 minutes, so this should only happen
when there are real issues or when brokers are shutdown). I have updated
the KIP to use a default timeout of 30 seconds for the existing close()
method.

Since the code changes are limited to the close() code path, can we include
this in 0.10.2.0? If so, I can initiate the vote tomorrow.

Thank you...


On Wed, Jan 4, 2017 at 5:35 PM, Jason Gustafson <jason@confluent.io> wrote:

> Hi Rajini,
>
> Thanks for the clarification. I looked again at the patch and I see what
> you're saying now. I was confused because I assumed the request timeout was
> being enforced on the requests themselves, but it is more that the request
> timeout bounds the attempt to send them in addition to the time to receive
> a response, right? So it is possible that we timeout before even getting a
> chance to send the OffsetCommit (for example).
>
> I think I'd still prefer timing out quicker by default if possible. The one
> case where it might be worthwhile waiting longer is when there are pending
> offset commits sent through commitSync() or commitAsync(). But if we're not
> actually doing retries or coordinator rediscovery, I'm not sure the
> additional time helps that much.
>
> -Jason
>
> On Wed, Jan 4, 2017 at 8:27 AM, Rajini Sivaram <rajinisivaram@gmail.com>
> wrote:
>
> > Hi Jason,
> >
> > Thank you for the review.
> >
> > During close(), if there is a rebalance and the coordinator has to be
> > rediscovered, close terminates without trying to find the coordinator.
> The
> > poll() loop within close terminates if the coordinator is not known (as
> it
> > does now) or if the timeout expires. At the moment, that timeout is a
> > hard-coded 5 second timeout. The PR changes that to min(closeTimeout,
> > requestTimeout). So even if there are pending commits, the maximum wait
> > will be requestTimeout in the final poll() loop of close().
> >
> > In addition to this, before the poll loop, there is a
> > maybeAutoCommitOffsetsSync(). At the moment, this does not have a timeout
> > and can wait indefinitely. The PR introduces a timeout for this commit
> > invoked from close(). The timeout is min(closeTimeout, requestTimeout).
> > Hence the maximum timeout of (2 * requestTimeout) for any close. Have I
> > missed something?
> >
> > I had chosen Long.MAX_VALUE as default close timeout to be consistent
> with
> > Producer. But perhaps a lower timeout of 30 seconds is more meaningful
> for
> > Consumer since consumer typically has less to do. Even with (2 *
> > requestTimeout), the default would be 20 minutes, which is perhaps too
> high
> > anyway. I will update the KIP.
> >
> >
> > On Wed, Jan 4, 2017 at 3:16 AM, Jason Gustafson <jason@confluent.io>
> > wrote:
> >
> > > Hey Rajini,
> > >
> > > Thanks for the KIP. I had a quick look at the patch and the impact
> > doesn't
> > > seem too bad. Just wanted to clarify one point. This is from the KIP:
> > >
> > > The existing close() method without a timeout will attempt to close the
> > > > consumer gracefully with a timeout of Long.MAX_VALUE. Since commit
> and
> > > > leave group requests are timed out after the request timeout, the
> upper
> > > > bound will be approximately 2*request.timeout.ms (around 10 minutes
> by
> > > > default).
> > >
> > >
> > > I don't think this is quite right. There could be one or more pending
> > > OffsetCommit requests (sent using commitAsync) that we may have to
> await.
> > > We could also be in the middle of a group rebalance. The other
> > complication
> > > is what happens in the event of a request timeout. Usually the consumer
> > > will rediscover the coordinator. Would we do that as well in close()
> and
> > > retry any failed requests if there is time remaining, or would we just
> > fail
> > > the remaining requests and return? In any case, it may not be so easy
> to
> > > set an upper bound on the default timeout.
> > >
> > > With that in mind, I'm wondering whether waiting indefinitely should be
> > the
> > > default. In the case of the OffsetCommit before closing (when
> autocommit
> > is
> > > enabled) or the LeaveGroup, it's more or less OK if these requests
> fail.
> > > Maybe we should consider them best effort (as is currently done) and
> > wait a
> > > reasonable amount of time (say 30 seconds) for their completion. I'd
> > rather
> > > have "nice" behavior out of the box and let users who want indefinite
> > > blocking use Long.MAX_VALUE themselves. What do you think?
> > >
> > > Thanks,
> > > Jason
> > >
> > > On Wed, Dec 21, 2016 at 4:39 AM, Rajini Sivaram <
> rajinisivaram@gmail.com
> > >
> > > wrote:
> > >
> > > > I have added some more detail to the "Proposed Changes" section. Also
> > > > created a preliminary PR for the JIRA (
> > > > https://github.com/apache/kafka/pull/2285).
> > > >
> > > > I am using *request.timeout.ms <http://request.timeout.ms>* to bound
> > > > individual requests during close (the KIP does not address timeouts
> in
> > > any
> > > > other code path) to ensure that *close()* always completes within a
> > > bounded
> > > > time even when timeout is not specified. This is similar to the
> > producer
> > > > where requests are aborted after *request.timeout.ms
> > > > <http://request.timeout.ms>. *The PR contains unit and integration
> > tests
> > > > for all the close scenarios I could think of (but there could be
> more).
> > > >
> > > >
> > > > On Mon, Dec 19, 2016 at 10:32 PM, Guozhang Wang <wangguoz@gmail.com>
> > > > wrote:
> > > >
> > > > > +1 on this idea as well.
> > > > >
> > > > > Streams has also added a similar feature itself partly because
> > consumer
> > > > > does not support it directly (other part of the reason is that like
> > > > > brokers, streams also have some exception handling logic which
> could
> > > lead
> > > > > to deadlock with careless System.exit). For consumer itself I think
> > the
> > > > > trickiness lies in the prefetching calls as well as commit / HB
> > > requests
> > > > > cleanup with the timeout, and I agree with Ewen that it's better
to
> > be
> > > > > merged in the early release cycle than a last minute merge.
> > > > >
> > > > >
> > > > >
> > > > > Guozhang
> > > > >
> > > > > On Mon, Dec 19, 2016 at 4:18 AM, Rajini Sivaram <
> > > rajinisivaram@gmail.com
> > > > >
> > > > > wrote:
> > > > >
> > > > > > Thank you for the reviews.
> > > > > >
> > > > > > @Becket @Ewen, Agree that making all blocking calls have a
> timeout
> > > will
> > > > > be
> > > > > > trickier and hence the scope of this KIP is limited to close().
> > > > > >
> > > > > > @Jay Yes, this should definitely go into release notes, will
make
> > > sure
> > > > it
> > > > > > is added. I will add some integration tests with broker failures
> > for
> > > > > > testing the timeout, but they cannot completely eliminate the
> risk
> > > of a
> > > > > > hang. Over time, hopefully system tests will help catch most
> > issues.
> > > > > >
> > > > > >
> > > > > > On Sat, Dec 17, 2016 at 1:15 AM, Jay Kreps <jay@confluent.io>
> > wrote:
> > > > > >
> > > > > > > I think this is great. Sounds like one implication is that
> > existing
> > > > > code
> > > > > > > that called close() and hit the timeout would now hang
> > > indefinitely.
> > > > We
> > > > > > saw
> > > > > > > this kind of thing a lot in automated testing scenarios
where
> > > people
> > > > > > don't
> > > > > > > correctly sequence their shutdown of client and server.
I think
> > > this
> > > > is
> > > > > > > okay, but might be good to include in the release notes.
> > > > > > >
> > > > > > > -jay
> > > > > > >
> > > > > > > On Thu, Dec 15, 2016 at 5:32 AM, Rajini Sivaram <
> > > rsivaram@pivotal.io
> > > > >
> > > > > > > wrote:
> > > > > > >
> > > > > > > Hi all,
> > > > > > >
> > > > > > >
> > > > > > >
> > > > > > >
> > > > > > >
> > > > > > > I have just created KIP-102 to add a new close method for
> > consumers
> > > > > with
> > > > > > a
> > > > > > >
> > > > > > >
> > > > > > > timeout parameter, making Consumer consistent with Producer:
> > > > > > >
> > > > > > >
> > > > > > >
> > > > > > >
> > > > > > >
> > > > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> > > > > > > 102+-+Add+close+with+timeout+for+consumers
> > > > > > >
> > > > > > >
> > > > > > >
> > > > > > >
> > > > > > >
> > > > > > > Comments and suggestions are welcome.
> > > > > > >
> > > > > > >
> > > > > > >
> > > > > > >
> > > > > > >
> > > > > > > Thank you...
> > > > > > >
> > > > > > >
> > > > > > >
> > > > > > >
> > > > > > >
> > > > > > > Regards,
> > > > > > >
> > > > > > >
> > > > > > >
> > > > > > >
> > > > > > >
> > > > > > > Rajini
> > > > > > >
> > > > > >
> > > > > >
> > > > > >
> > > > > > --
> > > > > > Regards,
> > > > > >
> > > > > > Rajini
> > > > > >
> > > > >
> > > > >
> > > > >
> > > > > --
> > > > > -- Guozhang
> > > > >
> > > >
> > >
> >
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message