kafka-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Guozhang Wang <wangg...@gmail.com>
Subject Re: [DISCUSS] KIP-62: Allow consumer to send heartbeats from a background thread
Date Mon, 06 Jun 2016 18:27:43 GMT
Jiangjie:

About doing the rebalance in the background thread, I'm a bit concerned as
it will change a lot of the concurrency guarantees that consumer currently
provides (think of a consumer caller thread committing externally while the
rebalance is happening in the background thread), and hence if we are
considering changing that now or in the future, we need to think through
all the corner cases.

So in general, I'd still prefer we reserve a third config for rebalance
timeout in this KIP.

Guozhang


On Mon, Jun 6, 2016 at 11:25 AM, Guozhang Wang <wangguoz@gmail.com> wrote:

> (+ Matthias)
>
> Hello Henry,
>
> Specifically to your question regarding Kafka Streams:
>
> 1. Currently restoreActiveState() is triggered in the onPartitionAssigned
> callback, which is after the rebalance is completed from the coordinator's
> point of view, and hence is covered in the process timeout value in this
> new KIP.
>
> 2. That is a good question, and I think it is a general root cause we saw
> failures of directory locking reported by more than one use case already.
> Currently I believe the main reason that a second rebalance is triggered
> while the processors are still completing restoreActiveState() of the
> previous rebalance is due to session timeout (default 30 seconds), which
> will be largely reduced with a larger processor timeout; however with
> complex topologies we restoreActiveState() for all states may still be
> taking long time with tens / hundreds of state stores, and other cases
> that also can cause consumers to re-join the groups right after a previous
> rebalance, for example 1) regex subscription where the topic metadata has
> changed, 2) consecutive consumer failures, or new consumers (i.e. new
> KStream instances / threads) added.
>
> For such cases we can do a better job to "fail fast" if the consumer
> detects another join is needed. I think in one of your local commit you
> are already doing sth similar, which we can merge back to trunk.
>
>
>
> Guozhang
>
>
> On Sun, Jun 5, 2016 at 11:24 PM, Henry Cai <hcai@pinterest.com.invalid>
> wrote:
>
>> I have a question on the KIP on long stall during
>> ProcessorStateManager.restoreActiveState(), this can be a long stall when
>> we need to rebuild the RocksDB state on a new node.
>>
>> 1. Is restoreActiveState() considered as post rebalance since this is
>> invoked on application rebalance listener?
>> 2. When the node A was spending long time rebuilding the state in
>> restoreActiveState() from the previous rebalance, a new node (node B) send
>> a new JoinGroup request to the co-ordinator, how long should the
>> coordinator wait for node A to finish the restoreActiveState from the
>> previous rebalance, the restoreActiveState can take more than 10 minutes
>> for a big state.
>>
>>
>> On Sun, Jun 5, 2016 at 10:46 PM, Becket Qin <becket.qin@gmail.com> wrote:
>>
>> > Hi Jason,
>> >
>> > Thanks for this very useful KIP.  In general I am with Guozhang on the
>> > purpose of of the three timeouts.
>> > 1) session timeout for consumer liveness,
>> > 2) process timeout (or maybe we should rename it to
>> max.poll.interval.ms)
>> > for application liveness,
>> > 3) rebalance timeout for faster rebalance in some failure cases.
>> >
>> > It seems the current discussion is mainly about whether we need 3) as a
>> > separate timeout or not. The current KIP proposal is to combine 2) and
>> 3),
>> > i.e. just use process timeout as rebalance timeout. That means we need
>> to
>> > either increase rebalance timeout out to let it adapt to process
>> timeout,
>> > or the reverse. It would be helpful to understand the impact of these
>> two
>> > cases. Here are my two cents.
>> >
>> > For users who are consuming data from Kafka, usually they either care
>> about
>> > throughput or care about latency.
>> >
>> > If users care about the latency, they would probably care more about
>> > average latency instead of 99.99 percentile latency which can be
>> affected
>> > by many other more common reasons other than consumer failure. Because
>> all
>> > the timeout we are discussing here only have impact on the 99.99
>> percentile
>> > latency, I don't think it would really make a difference for latency
>> > sensitive users.
>> >
>> > The majority of the use cases for Kafka Connect and Mirror Maker are
>> > throughput sensitive. Ewen raised a good example where Kafka Connect
>> needs
>> > to process the previous data on rebalance therefore requires a higher
>> > rebalance timeout than process timeout. This is essentially the same in
>> > Mirror Maker, where each rebalance needs to flush all the messages in
>> the
>> > accumulator in the producer. That could take some time depending on how
>> > many messages are there. In this case, we may need to increase the
>> process
>> > timeout to make it the same as rebalance timeout. But this is probably
>> > fine. The downside of increasing process timeout is a longer detection
>> time
>> > of a consumer failure.  Detecting a consumer failure a little later only
>> > has limited impact because the rest of the consumers in the same group
>> are
>> > still working fine. So the total throughput is unlikely to drop
>> > significantly. As long as the rebalance is not taking longer it should
>> be
>> > fine. The reason we care more about how fast rebalance can finish is
>> > because during rebalance no consumer in the group is consuming, i.e.
>> > throughput is zero. So we want to make the rebalance finish as quickly
>> as
>> > possible.
>> >
>> > Compare with increasing process timeout to rebalance timeout, it seems a
>> > more common case where user wants a longer process timeout, but smaller
>> > rebalance timeout. I am more worried about this case where we have to
>> > shoehorn the rebalance timeout into process timeout. For users care
>> about
>> > throughput, that might cause the rebalance to take unnecessarily longer.
>> > Admittedly this only has impact when a consumer had problem during
>> > rebalance, but depending on how long the process timeout was set, the
>> > rebalance could potentially take forever like Guozhang mentioned.
>> >
>> > I agree with Guozhang that we can start with 1) and 2) and add 3) later
>> if
>> > needed. But adding rebalance timeout is more involved than just adding a
>> > configuration. That also means the rebalance has to be done in the
>> > background heartbeat thread. Hence we have to synchronize rebalance and
>> > consumer.poll() like we did in old consumer. Otherwise user may lose
>> > messages if auto commit is enabled, or the manual commit might fail
>> after a
>> > consumer.poll() because the partitions might have been reassigned. So
>> > having a separate rebalance timeout also potentially means a big change
>> to
>> > the users as well.
>> >
>> > Thanks,
>> >
>> > Jiangjie (Becket) Qin
>> >
>> >
>> >
>> > On Fri, Jun 3, 2016 at 11:45 AM, Jason Gustafson <jason@confluent.io>
>> > wrote:
>> >
>> > > Hey Ewen,
>> > >
>> > > I confess your comments caught me off guard. It never occurred to me
>> that
>> > > anyone would ask for a rebalance timeout so that it could be set
>> _larger_
>> > > than the process timeout. Even with buffered or batch processing, I
>> would
>> > > usually expect flushing before a rebalance to take no more time than a
>> > > periodic flush. Otherwise, I'd probably try to see if there was some
>> > > workload I could push into periodic flushes so that rebalances could
>> > > complete faster. But supposing this isn't possible or practical in
>> some
>> > > cases, I'm wondering how limiting it would be in practice to have only
>> > the
>> > > one timeout in this case? I'm a little reluctant to add the additional
>> > > timeout since I think most users would not have a strong need to keep
>> a
>> > > tight bound on normal processing time. (I'm also reminded that Jay
>> > > mentioned he might have to dock everyone's pay 5% for each new
>> timeout we
>> > > introduce ;-)
>> > >
>> > > Thanks,
>> > > Jason
>> > >
>> > >
>> > >
>> > >
>> > > On Thu, Jun 2, 2016 at 7:30 PM, Guozhang Wang <wangguoz@gmail.com>
>> > wrote:
>> > >
>> > > > Hi Ewen,
>> > > >
>> > > > I think you are right, the rebalance process could potentially
>> involve
>> > > all
>> > > > the delayed compute / IO. More specifically, this is what I think of
>> > the
>> > > > rebalance process:
>> > > >
>> > > > 1. Coordinator decides to rebalance, start ticking based on
>> rebalance
>> > > time
>> > > > out.
>> > > > 2. Consumer realize rebalance needed when calling poll(); trigger
>> > > > onPartitionRevoked().
>> > > > 3. Consumer sends JoinGroupRequest;
>> > > > 4. Coordinator send JoinGroupResponse; start ticking on the leader.
>> > > > 5. Leader compute and send SyncGroupRequest
>> > > > 6. Coordinator send SyncGroupResponse; start ticking on session
>> > timeout.
>> > > > 7. Consumer get new assignment; trigger onPartitionAssigned().
>> > > >
>> > > > In the above process: delayed compute / IO is usually done at step
>> 2);
>> > > > workload initialization is usually done in step 7); and some admin
>> work
>> > > > (like in Kafka Streams) are likely to be done in step 5). As in the
>> > > current
>> > > > KIP proposal the rebalance timeout on the coordinator start ticking
>> on
>> > 1)
>> > > > on everyone in the group, and stop ticking on 3); it start ticking
>> on
>> > > > leader again on step 4), and stop upon step 5). In this case the
>> > delayed
>> > > > compute / IO contained in step 2) is covered by this rebalance
>> timeout.
>> > > >
>> > > > That being said, I think for "worst case", the time of processing a
>> > > single
>> > > > record would still be similar to rebalancing, since both of which
>> could
>> > > > result in completing all delayed compute / IO so far. And since
>> > > "processing
>> > > > timeout" is used to cover the worst case, it should be still OK?
>> > > >
>> > > >
>> > > > Guozhang
>> > > >
>> > > >
>> > > >
>> > > >
>> > > > On Thu, Jun 2, 2016 at 5:55 PM, Ewen Cheslack-Postava <
>> > ewen@confluent.io
>> > > >
>> > > > wrote:
>> > > >
>> > > > > Jason,
>> > > > >
>> > > > > I've been thinking about this more in terms of something like
>> > Connect.
>> > > I
>> > > > > think the rebalance timeout may be a bit different from the
>> process
>> > > > > timeout, and even the process timeout is a bit of a misnomer.
>> > > > >
>> > > > > We sort of talk about the process timeout as if it can be an
>> > indicator
>> > > of
>> > > > > maximum processing time for a record/batch. This makes sense for a
>> > case
>> > > > of
>> > > > > a data-dependent load (i.e. you can only load some data from slow
>> > > storage
>> > > > > after seeing some data) where that load might be very large
>> compared
>> > to
>> > > > > normal processing time. It also makes sense if you have auto
>> commit
>> > > > enabled
>> > > > > because you need to be completely finished processing the data
>> before
>> > > > > calling poll() again, so that time before you call another
>> consumer
>> > API
>> > > > > actually reflects processing time.
>> > > > >
>> > > > > It might makes less sense in cases like streams (or any other app)
>> > that
>> > > > > batch writes to disk, or connectors that "process" a message by
>> > > enqueuing
>> > > > > the data, but won't commit offsets until data is flushed, possibly
>> > > during
>> > > > > some other, much later iteration of processing. In this case I
>> think
>> > > > > processing time and rebalance time could potentially differ
>> > > > significantly.
>> > > > > During normal processing, you can potentially pipeline quite a
>> bit,
>> > > > > buffering up changes, flushing as needed, but then only committing
>> > once
>> > > > > flushing is complete. But rebalancing is different then -- you
>> *must*
>> > > > > finish flushing all the data or manually choose to discard the
>> data
>> > > > > (presumably by doing something like watching for the process
>> timeout
>> > > you
>> > > > > set and bailing early, only committing the offsets for data you've
>> > > > > flushed). If you have lots of data built up, the cost for
>> rebalancing
>> > > > could
>> > > > > be a *lot* higher than the maximum time you would otherwise see
>> > between
>> > > > > calls to consumer APIs to indicate processing progress.
>> > > > >
>> > > > > The thing that makes these cases different is that processing
>> isn't
>> > > > > actually tied to calls to the consumer API. You can queue up /
>> > > pipeline /
>> > > > > defer some of the work. (By the way, this is currently a
>> limitation
>> > of
>> > > > sink
>> > > > > connectors that I'm not thrilled about -- offset commit requires a
>> > full
>> > > > > flush, whereas some coordination with the sink connector to not
>> > > require a
>> > > > > full flush except on rebalances would be much nicer, albeit more
>> > > > difficult
>> > > > > for sink connectors to implement.)
>> > > > >
>> > > > > -Ewen
>> > > > >
>> > > > >
>> > > > >
>> > > > > On Thu, Jun 2, 2016 at 5:14 PM, Jason Gustafson <
>> jason@confluent.io>
>> > > > > wrote:
>> > > > >
>> > > > > > Hey Guozhang,
>> > > > > >
>> > > > > > I'm actually not too concerned about the time spent in the
>> > rebalance
>> > > > > > callback specifically. Both it and regular processing time in
>> the
>> > > poll
>> > > > > loop
>> > > > > > will delay the rebalance and keep joined consumers idle.
>> However,
>> > if
>> > > we
>> > > > > > expose the rebalance timeout, then it would give users the
>> option
>> > to
>> > > > > > effective disable the process timeout while still keeping a
>> maximum
>> > > > bound
>> > > > > > on the rebalance time. If the consumer cannot complete its
>> > processing
>> > > > > fast
>> > > > > > enough and rejoin, then it would be evicted. This provides
>> > something
>> > > > like
>> > > > > > (2) since the other consumers in the group would be able to
>> > complete
>> > > > the
>> > > > > > rebalance and resume work while the evicted consumer would have
>> to
>> > > > > rollback
>> > > > > > progress. This is not too different from rebalancing in the
>> > > background
>> > > > > > which also typically would cause commit failure and rollback
>> > (though
>> > > at
>> > > > > > least the consumer stays in the group).
>> > > > > >
>> > > > > > Now that I'm thinking about it more, I'm not sure this would be
>> a
>> > > great
>> > > > > > facility to depend on in practice. It might be OK if just one or
>> > two
>> > > of
>> > > > > the
>> > > > > > consumers fall out of the group during the rebalance, but if
>> half
>> > the
>> > > > > group
>> > > > > > is regularly getting evicted, it would be a problem. So even if
>> we
>> > > > expose
>> > > > > > the rebalance timeout, the user is still going to have to set it
>> > with
>> > > > > some
>> > > > > > idea in mind about how long processing should take.
>> > > > > >
>> > > > > > Thanks,
>> > > > > > Jason
>> > > > > >
>> > > > > > On Thu, Jun 2, 2016 at 2:46 PM, Guozhang Wang <
>> wangguoz@gmail.com>
>> > > > > wrote:
>> > > > > >
>> > > > > > > Hi Jason,
>> > > > > > >
>> > > > > > > With the current usage pattern of:
>> > > > > > >
>> > > > > > > while(..) {
>> > > > > > >
>> > > > > > >   consumer.poll(/* where rebalance happens */)
>> > > > > > >
>> > > > > > >   // process messages
>> > > > > > > }
>> > > > > > >
>> > > > > > > ----------
>> > > > > > >
>> > > > > > > And since rebalance is till on the caller thread, not the
>> > > background
>> > > > > > > thread, if coordinator decides to rebalance while user thread
>> is
>> > > > still
>> > > > > on
>> > > > > > > processing messages, there is no options but we are forced to
>> go
>> > > with
>> > > > > 1)
>> > > > > > > right? I think the your / Onur's point here, which I agree, is
>> > that
>> > > > by
>> > > > > > > reusing process timeout as rebalance timeout, if the rebalance
>> > > > callback
>> > > > > > > could take longer time than processing a batch, users need to
>> set
>> > > the
>> > > > > > > timeout value to the higher of the two, hence the callback
>> > latency,
>> > > > > which
>> > > > > > > will make detection of processing stallness less effective,
>> > right?
>> > > > > > >
>> > > > > > > As I mentioned  in my previous email, I feel that this case of
>> > > > > "callback
>> > > > > > > function time taking loner than processing a batch" would not
>> be
>> > > > > frequent
>> > > > > > > in practice, and the processing timeout would usually be a
>> good
>> > > > higher
>> > > > > > > bound on the callback function latency. If that is true, I'd
>> > > suggest
>> > > > we
>> > > > > > > keep the current proposal and not add a third timeout config
>> for
>> > > > > covering
>> > > > > > > this case.
>> > > > > > >
>> > > > > > >
>> > > > > > > Guozhang
>> > > > > > >
>> > > > > > >
>> > > > > > > On Thu, Jun 2, 2016 at 10:40 AM, Jason Gustafson <
>> > > jason@confluent.io
>> > > > >
>> > > > > > > wrote:
>> > > > > > >
>> > > > > > > > Hey Guozhang,
>> > > > > > > >
>> > > > > > > > I think the problem is that users may not want to sacrifice
>> > > > rebalance
>> > > > > > > > latency because of uncertainty around processing time. As
>> soon
>> > > as a
>> > > > > > > > rebalance begins, there are basically two choices:
>> > > > > > > >
>> > > > > > > > 1. Block the rebalance until all consumers have finished
>> their
>> > > > > current
>> > > > > > > > processing.
>> > > > > > > > 2. Let all consumers rebalance and "rollback" any processing
>> > that
>> > > > > could
>> > > > > > > not
>> > > > > > > > be committed before the rebalance completes.
>> > > > > > > >
>> > > > > > > > If you choose option (1), then you have an incentive to
>> keep a
>> > > > > > relatively
>> > > > > > > > tight bound on process.timeout.ms in order to reduce the
>> > > > worst-case
>> > > > > > idle
>> > > > > > > > time during a rebalance. But if you fail to set it high
>> enough,
>> > > > then
>> > > > > > > you'll
>> > > > > > > > get spurious rebalances during normal processing. I think
>> Onur
>> > is
>> > > > > > saying
>> > > > > > > > that this still sort of sucks for users. On the other hand,
>> if
>> > > (2)
>> > > > is
>> > > > > > > > acceptable, then users will have more freedom to err on the
>> > high
>> > > > side
>> > > > > > > when
>> > > > > > > > setting process.timeout.ms, or even disable it entirely.
>> They
>> > > will
>> > > > > > have
>> > > > > > > to
>> > > > > > > > deal with rolling back any progress which cannot be
>> committed
>> > > after
>> > > > > the
>> > > > > > > > rebalance completes, but maybe this is less of a problem for
>> > some
>> > > > > > users?
>> > > > > > > >
>> > > > > > > > Thanks,
>> > > > > > > > Jason
>> > > > > > > >
>> > > > > > > >
>> > > > > > > >
>> > > > > > > > On Wed, Jun 1, 2016 at 10:23 PM, Guozhang Wang <
>> > > wangguoz@gmail.com
>> > > > >
>> > > > > > > wrote:
>> > > > > > > >
>> > > > > > > > > Hi Onur, Jason:
>> > > > > > > > >
>> > > > > > > > > Here are some thoughts about reusing process timeout as
>> > > > server-side
>> > > > > > > > > rebalance timeout: First of all, my understanding is that
>> > > > > > > > >
>> > > > > > > > > 1) session timeout is for detecting consumer crash / hard
>> > > > failures
>> > > > > > (in
>> > > > > > > > this
>> > > > > > > > > case the heartbeat thread will be dead as well, hence
>> > > coordinator
>> > > > > > > > realized
>> > > > > > > > > within session timeout value).
>> > > > > > > > >
>> > > > > > > > > 2) process timeout is for checking liveness of the user
>> > thread
>> > > > that
>> > > > > > > calls
>> > > > > > > > > the consumer as well as does the processing: when no
>> consumer
>> > > > calls
>> > > > > > are
>> > > > > > > > > made within the process timeout, heartbeat thread stop
>> > working
>> > > > and
>> > > > > > > hence
>> > > > > > > > it
>> > > > > > > > > will be detected by coordinator.
>> > > > > > > > >
>> > > > > > > > > 3) a potential server-side rebalance timeout would be
>> used to
>> > > > > detect
>> > > > > > > > > consumer liveness during the rebalance period, in which
>> the
>> > > user
>> > > > > > thread
>> > > > > > > > is
>> > > > > > > > > tied with the "poll" call and also the callback function,
>> to
>> > > > > prevent
>> > > > > > a
>> > > > > > > > slow
>> > > > > > > > > / stalled consumer in their rebalance callback to cause
>> the
>> > > > > rebalance
>> > > > > > > > > taking forever.
>> > > > > > > > >
>> > > > > > > > > I think we generally have two cases in practice regarding
>> 3)
>> > > > above:
>> > > > > > > user
>> > > > > > > > > either does almost nothing and hence should never be
>> stalled
>> > > > > (unless
>> > > > > > > > there
>> > > > > > > > > is a long GC), or they do various external IOs for
>> > maintaining
>> > > > > their
>> > > > > > > own
>> > > > > > > > > states, for example, which could be taking long or even
>> cause
>> > > the
>> > > > > > > thread
>> > > > > > > > to
>> > > > > > > > > stall. We do not need to worry too much about the former
>> > case,
>> > > > and
>> > > > > as
>> > > > > > > for
>> > > > > > > > > latter case if the process timeout value should usually
>> be a
>> > > good
>> > > > > > > higher
>> > > > > > > > > bound on the rebalance latency.
>> > > > > > > > >
>> > > > > > > > > That being said, if we observe that there is indeed a
>> common
>> > > > usage
>> > > > > > > where
>> > > > > > > > 2)
>> > > > > > > > > and 3) would require very different timeout values which
>> > > > overwhelms
>> > > > > > the
>> > > > > > > > > complexity of three timeout values, we can consider
>> adding a
>> > > > third
>> > > > > > one
>> > > > > > > > > then: it is easier to add more configs later.
>> > > > > > > > >
>> > > > > > > > >
>> > > > > > > > > What do you think?
>> > > > > > > > >
>> > > > > > > > > Guozhang
>> > > > > > > > >
>> > > > > > > > >
>> > > > > > > > > On Tue, May 31, 2016 at 2:35 PM, Jason Gustafson <
>> > > > > jason@confluent.io
>> > > > > > >
>> > > > > > > > > wrote:
>> > > > > > > > >
>> > > > > > > > > > Hey Onur,
>> > > > > > > > > >
>> > > > > > > > > > Thanks for the detailed response. I think the problem of
>> > > > > > controlling
>> > > > > > > > > > rebalance times is the main (known) gap in the proposal
>> as
>> > it
>> > > > > > stands.
>> > > > > > > > > >
>> > > > > > > > > > This burden goes away if you loosen the liveness
>> property
>> > by
>> > > > > > having a
>> > > > > > > > > > > required rebalance time and optional processing time
>> > where
>> > > > > > > rebalance
>> > > > > > > > > > > happens in the background thread as stated in the KIP.
>> > > > > > > > > >
>> > > > > > > > > >
>> > > > > > > > > > Just to clarify, the current KIP only allows rebalances
>> to
>> > > > > complete
>> > > > > > > in
>> > > > > > > > > the
>> > > > > > > > > > foreground. When I suggested above in reply to Grant was
>> > that
>> > > > we
>> > > > > > > could
>> > > > > > > > > add
>> > > > > > > > > > a separate rebalance timeout setting, the behavior I
>> had in
>> > > > mind
>> > > > > > was
>> > > > > > > to
>> > > > > > > > > let
>> > > > > > > > > > the consumer fall out of the group if the timeout is
>> > reached
>> > > > > while
>> > > > > > > the
>> > > > > > > > > > consumer is still processing. I was specifically trying
>> to
>> > > > avoid
>> > > > > > > moving
>> > > > > > > > > the
>> > > > > > > > > > rebalance to the background thread since this
>> significantly
>> > > > > > increases
>> > > > > > > > the
>> > > > > > > > > > complexity of the implementation. We'd also have to
>> think
>> > > about
>> > > > > > > > > > compatibility a bit more. For example, what are the
>> > > > implications
>> > > > > of
>> > > > > > > > > having
>> > > > > > > > > > the rebalance listener execute in a separate thread?
>> > > > > > > > > >
>> > > > > > > > > > Putting that issue aside, I think we need to convince
>> > > ourselves
>> > > > > > that
>> > > > > > > a
>> > > > > > > > > > separate rebalance timeout is really necessary since
>> every
>> > > new
>> > > > > > > timeout
>> > > > > > > > > adds
>> > > > > > > > > > some conceptual noise which all users will see. My
>> thought
>> > in
>> > > > > this
>> > > > > > > KIP
>> > > > > > > > > was
>> > > > > > > > > > that users who didn't want the burden of tuning the
>> process
>> > > > > timeout
>> > > > > > > > could
>> > > > > > > > > > use a relatively large value without a major impact
>> because
>> > > > group
>> > > > > > > > > > rebalances themselves will typically be infrequent. The
>> > main
>> > > > > > concern
>> > > > > > > is
>> > > > > > > > > for
>> > > > > > > > > > users who have highly variant processing times and want
>> to
>> > > > > ensure a
>> > > > > > > > tight
>> > > > > > > > > > bound on rebalance times (even if it means having to
>> > discard
>> > > > some
>> > > > > > > > > > processing that cannot be completed before the rebalance
>> > > > > finishes).
>> > > > > > > > These
>> > > > > > > > > > users will be left trying to tune process.timeout.ms
>> and
>> > > > > > > > > max.poll.records,
>> > > > > > > > > > which is basically the same position they are currently
>> in.
>> > > The
>> > > > > > > problem
>> > > > > > > > > is
>> > > > > > > > > > I don't know how common this case is, so I'm not sure
>> how
>> > it
>> > > > > weighs
>> > > > > > > > > against
>> > > > > > > > > > the cost of having an additional timeout that needs to
>> be
>> > > > > > explained.
>> > > > > > > We
>> > > > > > > > > can
>> > > > > > > > > > always add the rebalance timeout later, but if it will
>> be
>> > > tough
>> > > > > to
>> > > > > > > > remove
>> > > > > > > > > > once it's there. All the same, I'm not that keen on
>> another
>> > > > > > iteration
>> > > > > > > > of
>> > > > > > > > > > this problem, so if we believe this use case is common
>> > > enough,
>> > > > > then
>> > > > > > > > maybe
>> > > > > > > > > > we should add it now.
>> > > > > > > > > >
>> > > > > > > > > > Thanks,
>> > > > > > > > > > Jason
>> > > > > > > > > >
>> > > > > > > > > >
>> > > > > > > > > > On Sat, May 28, 2016 at 3:10 AM, Onur Karaman <
>> > > > > > > > > > onurkaraman.apache@gmail.com>
>> > > > > > > > > > wrote:
>> > > > > > > > > >
>> > > > > > > > > > > Thanks for the KIP writeup, Jason.
>> > > > > > > > > > >
>> > > > > > > > > > > Before anything else, I just wanted to point out that
>> > it's
>> > > > > worth
>> > > > > > > > > > mentioning
>> > > > > > > > > > > the "heartbeat.interval.ms" consumer config in the
>> KIP
>> > for
>> > > > > > > > > completeness.
>> > > > > > > > > > > Today this config only starts to kick in if poll is
>> > called
>> > > > > > > frequently
>> > > > > > > > > > > enough. A separate heartbeat thread should make this
>> > config
>> > > > > > behave
>> > > > > > > > more
>> > > > > > > > > > > like what people would expect: a separate thread
>> sending
>> > > > > > heartbeats
>> > > > > > > > at
>> > > > > > > > > > the
>> > > > > > > > > > > configured interval.
>> > > > > > > > > > >
>> > > > > > > > > > > With this KIP, the relevant configs become:
>> > > > > > > > > > > "max.poll.records" - already exists
>> > > > > > > > > > > "session.timeout.ms" - already exists
>> > > > > > > > > > > "heartbeat.interval.ms" - already exists
>> > > > > > > > > > > "process.timeout.ms" - new
>> > > > > > > > > > >
>> > > > > > > > > > > After reading the KIP several times, I think it would
>> be
>> > > > > helpful
>> > > > > > to
>> > > > > > > > be
>> > > > > > > > > > more
>> > > > > > > > > > > explicit in the desired outcome. Is it trying to make
>> > > faster
>> > > > > > > > > > > best/average/worst case rebalance times? Is it trying
>> to
>> > > make
>> > > > > the
>> > > > > > > > > clients
>> > > > > > > > > > > need less configuration tuning?
>> > > > > > > > > > >
>> > > > > > > > > > > Also it seems that brokers probably still want to
>> enforce
>> > > > > minimum
>> > > > > > > and
>> > > > > > > > > > > maximum rebalance timeouts just as with the minimum
>> and
>> > > > maximum
>> > > > > > > > session
>> > > > > > > > > > > timeouts so DelayedJoins don't stay in purgatory
>> > > > indefinitely.
>> > > > > So
>> > > > > > > > we'd
>> > > > > > > > > > add
>> > > > > > > > > > > new "group.min.rebalance.timeout.ms" and "
>> > > > > > > > > group.max.rebalance.timeout.ms
>> > > > > > > > > > "
>> > > > > > > > > > > broker configs which again might need to be brought
>> up in
>> > > the
>> > > > > > KIP.
>> > > > > > > > > Let's
>> > > > > > > > > > > say we add these bounds. A side-effect of having
>> > > broker-side
>> > > > > > bounds
>> > > > > > > > on
>> > > > > > > > > > > rebalance timeouts in combination with Java clients
>> that
>> > > > makes
>> > > > > > > > process
>> > > > > > > > > > > timeouts the same as rebalance timeouts is that the
>> > broker
>> > > > > > > > effectively
>> > > > > > > > > > > dictates the max processing time allowed between poll
>> > > calls.
>> > > > > This
>> > > > > > > > > gotcha
>> > > > > > > > > > > exists right now with today's broker-side bounds on
>> > session
>> > > > > > > timeouts.
>> > > > > > > > > So
>> > > > > > > > > > > I'm not really convinced that the proposal gets rid of
>> > this
>> > > > > > > > > complication
>> > > > > > > > > > > mentioned in the KIP.
>> > > > > > > > > > >
>> > > > > > > > > > > I think the main question to ask is: does the KIP
>> > actually
>> > > > > make a
>> > > > > > > > > > > difference?
>> > > > > > > > > > >
>> > > > > > > > > > > It looks like this KIP improves rebalance times
>> > > specifically
>> > > > > when
>> > > > > > > the
>> > > > > > > > > > > client currently has processing times large enough to
>> > force
>> > > > > > larger
>> > > > > > > > > > session
>> > > > > > > > > > > timeouts and heartbeat intervals to not be honored.
>> > > > Separating
>> > > > > > > > session
>> > > > > > > > > > > timeouts from processing time means clients can keep
>> > their
>> > > "
>> > > > > > > > > > > session.timeout.ms" low so the coordinator can
>> quickly
>> > > > detect
>> > > > > > > > process
>> > > > > > > > > > > failure, and honoring a low "heartbeat.interval.ms"
>> on
>> > the
>> > > > > > > separate
>> > > > > > > > > > > heartbeat thread means clients will be quickly
>> notified
>> > of
>> > > > > group
>> > > > > > > > > > membership
>> > > > > > > > > > > and subscription changes - all without placing
>> difficult
>> > > > > > > expectations
>> > > > > > > > > on
>> > > > > > > > > > > processing time. But even so, rebalancing through the
>> > > calling
>> > > > > > > thread
>> > > > > > > > > > means
>> > > > > > > > > > > the slowest processing client in the group will still
>> be
>> > > the
>> > > > > rate
>> > > > > > > > > > limiting
>> > > > > > > > > > > step when looking at rebalance times.
>> > > > > > > > > > >
>> > > > > > > > > > > From a usability perspective, the burden still seems
>> like
>> > > it
>> > > > > will
>> > > > > > > be
>> > > > > > > > > > tuning
>> > > > > > > > > > > the processing time to keep the "progress liveness"
>> happy
>> > > > > during
>> > > > > > > > > > rebalances
>> > > > > > > > > > > while still having reasonable upper bounds on
>> rebalance
>> > > > times.
>> > > > > It
>> > > > > > > > still
>> > > > > > > > > > > looks like users have to do almost the exact same
>> tricks
>> > as
>> > > > > today
>> > > > > > > > when
>> > > > > > > > > > the
>> > > > > > > > > > > group membership changes due slow processing times
>> even
>> > > > though
>> > > > > > all
>> > > > > > > > the
>> > > > > > > > > > > consumers are alive and the topics haven't change:
>> > > > > > > > > > > 1. Increase the rebalance timeout to give more time
>> for
>> > > > record
>> > > > > > > > > processing
>> > > > > > > > > > > (the difference compared to today is that we bump the
>> > > > rebalance
>> > > > > > > > timeout
>> > > > > > > > > > > instead of session timeout).
>> > > > > > > > > > > 2. Reduce the number of records handled on each
>> iteration
>> > > > with
>> > > > > > > > > > > max.poll.records.
>> > > > > > > > > > >
>> > > > > > > > > > > This burden goes away if you loosen the liveness
>> property
>> > > by
>> > > > > > > having a
>> > > > > > > > > > > required rebalance time and optional processing time
>> > where
>> > > > > > > rebalance
>> > > > > > > > > > > happens in the background thread as stated in the KIP.
>> > > > > > > > > > >
>> > > > > > > > > > > On Thu, May 26, 2016 at 12:40 PM, Jason Gustafson <
>> > > > > > > > jason@confluent.io>
>> > > > > > > > > > > wrote:
>> > > > > > > > > > >
>> > > > > > > > > > > > Hey Grant,
>> > > > > > > > > > > >
>> > > > > > > > > > > > Thanks for the feedback. I'm definitely open to
>> > including
>> > > > > > > > heartbeat()
>> > > > > > > > > > in
>> > > > > > > > > > > > this KIP. One thing we should be clear about is what
>> > the
>> > > > > > behavior
>> > > > > > > > of
>> > > > > > > > > > > > heartbeat() should be when the group begins
>> > rebalancing.
>> > > I
>> > > > > > think
>> > > > > > > > > there
>> > > > > > > > > > > are
>> > > > > > > > > > > > basically two options:
>> > > > > > > > > > > >
>> > > > > > > > > > > > 1. heartbeat() simply keeps heartbeating even if the
>> > > group
>> > > > > has
>> > > > > > > > > started
>> > > > > > > > > > > > rebalancing.
>> > > > > > > > > > > > 2. heartbeat() completes the rebalance itself.
>> > > > > > > > > > > >
>> > > > > > > > > > > > With the first option, when processing takes longer
>> > than
>> > > > the
>> > > > > > > > > rebalance
>> > > > > > > > > > > > timeout, the member will fall out of the group which
>> > will
>> > > > > cause
>> > > > > > > an
>> > > > > > > > > > offset
>> > > > > > > > > > > > commit failure when it finally finishes. However, if
>> > > > > processing
>> > > > > > > > > > finishes
>> > > > > > > > > > > > before the rebalance completes, then offsets can
>> still
>> > be
>> > > > > > > > committed.
>> > > > > > > > > On
>> > > > > > > > > > > the
>> > > > > > > > > > > > other hand, if heartbeat() completes the rebalance
>> > > itself,
>> > > > > then
>> > > > > > > > > you'll
>> > > > > > > > > > > > definitely see the offset commit failure for any
>> > records
>> > > > > being
>> > > > > > > > > > processed.
>> > > > > > > > > > > > So the first option is sort of biased toward
>> processing
>> > > > > > > completion
>> > > > > > > > > > while
>> > > > > > > > > > > > the latter is biased toward rebalance completion.
>> > > > > > > > > > > >
>> > > > > > > > > > > > I'm definitely not a fan of second option since it
>> > takes
>> > > > away
>> > > > > > the
>> > > > > > > > > > choice
>> > > > > > > > > > > to
>> > > > > > > > > > > > finish processing before rejoining. However, I do
>> see
>> > > some
>> > > > > > > benefit
>> > > > > > > > in
>> > > > > > > > > > the
>> > > > > > > > > > > > first option if the user wants to keep rebalance
>> time
>> > low
>> > > > and
>> > > > > > > > doesn't
>> > > > > > > > > > > mind
>> > > > > > > > > > > > being kicked out of the group if processing takes
>> > longer
>> > > > > > during a
>> > > > > > > > > > > > rebalance. This may be a reasonable tradeoff since
>> > > consumer
>> > > > > > > groups
>> > > > > > > > > are
>> > > > > > > > > > > > presumed to be stable most of the time. A better
>> option
>> > > in
>> > > > > that
>> > > > > > > > case
>> > > > > > > > > > > might
>> > > > > > > > > > > > be to expose the rebalance timeout to the user
>> directly
>> > > > since
>> > > > > > it
>> > > > > > > > > would
>> > > > > > > > > > > > allow the user to use an essentially unbounded
>> > > > > > > process.timeout.ms
>> > > > > > > > > for
>> > > > > > > > > > > > highly variant processing while still keeping
>> rebalance
>> > > > time
>> > > > > > > > limited.
>> > > > > > > > > > Of
>> > > > > > > > > > > > course, it would be another timeout for the user to
>> > > > > > understand...
>> > > > > > > > > > > >
>> > > > > > > > > > > > Thanks,
>> > > > > > > > > > > > Jason
>> > > > > > > > > > > >
>> > > > > > > > > > > > On Thu, May 26, 2016 at 8:19 AM, Grant Henke <
>> > > > > > > ghenke@cloudera.com>
>> > > > > > > > > > > wrote:
>> > > > > > > > > > > >
>> > > > > > > > > > > > > Hi Jason,
>> > > > > > > > > > > > >
>> > > > > > > > > > > > > Thanks for writing up a proposal (and a thorough
>> > one)!
>> > > > This
>> > > > > > is
>> > > > > > > > > > > something
>> > > > > > > > > > > > > that I had been thinking about this week too as I
>> > have
>> > > > run
>> > > > > > into
>> > > > > > > > it
>> > > > > > > > > > more
>> > > > > > > > > > > > > than a handful of times now.
>> > > > > > > > > > > > >
>> > > > > > > > > > > > > I like the idea of having a larger processing
>> > timeout,
>> > > > that
>> > > > > > > > timeout
>> > > > > > > > > > in
>> > > > > > > > > > > > > unison with max.poll.records should in many cases
>> > > > provide a
>> > > > > > > > > > reasonable
>> > > > > > > > > > > > > assurance that the consumer will stay alive.
>> > > > > > > > > > > > >
>> > > > > > > > > > > > > In rejected alternatives "Add a separate API the
>> user
>> > > can
>> > > > > > call
>> > > > > > > to
>> > > > > > > > > > > > indicate
>> > > > > > > > > > > > > liveness" is listed. I think a heartbeat api
>> could be
>> > > > added
>> > > > > > > along
>> > > > > > > > > > with
>> > > > > > > > > > > > > these new timeout configurations and used for
>> > > "advanced"
>> > > > > use
>> > > > > > > > cases
>> > > > > > > > > > > where
>> > > > > > > > > > > > > the processing time could be highly variant and
>> less
>> > > > > > > > predictable. I
>> > > > > > > > > > > > think a
>> > > > > > > > > > > > > place where we might use the heartbeat api in
>> Kafka
>> > is
>> > > > > > > > MirrorMaker.
>> > > > > > > > > > > > >
>> > > > > > > > > > > > > Today, I have seen people trying to find ways to
>> > > leverage
>> > > > > the
>> > > > > > > > > > existing
>> > > > > > > > > > > > api
>> > > > > > > > > > > > > to "force" heartbeats by:
>> > > > > > > > > > > > >
>> > > > > > > > > > > > > 1. Calling poll to get the batch of records to
>> > process
>> > > > > > > > > > > > > 2. Call pause on all partitions
>> > > > > > > > > > > > > 3. Process the record batch
>> > > > > > > > > > > > > 3a. While processing periodically call poll
>> (which is
>> > > > > > > essentially
>> > > > > > > > > > just
>> > > > > > > > > > > > > heartbeat since it returns no records and is
>> paused)
>> > > > > > > > > > > > > 4. Commit offsets and un-pause
>> > > > > > > > > > > > > 5. Repeat from 1
>> > > > > > > > > > > > >
>> > > > > > > > > > > > > Thanks,
>> > > > > > > > > > > > > Grant
>> > > > > > > > > > > > >
>> > > > > > > > > > > > >
>> > > > > > > > > > > > >
>> > > > > > > > > > > > >
>> > > > > > > > > > > > >
>> > > > > > > > > > > > >
>> > > > > > > > > > > > > On Wed, May 25, 2016 at 6:32 PM, Jason Gustafson <
>> > > > > > > > > jason@confluent.io
>> > > > > > > > > > >
>> > > > > > > > > > > > > wrote:
>> > > > > > > > > > > > >
>> > > > > > > > > > > > > > Hi All,
>> > > > > > > > > > > > > >
>> > > > > > > > > > > > > > One of the persistent problems we see with the
>> new
>> > > > > consumer
>> > > > > > > is
>> > > > > > > > > the
>> > > > > > > > > > > use
>> > > > > > > > > > > > of
>> > > > > > > > > > > > > > the session timeout in order to ensure progress.
>> > > > Whenever
>> > > > > > > there
>> > > > > > > > > is
>> > > > > > > > > > a
>> > > > > > > > > > > > > delay
>> > > > > > > > > > > > > > in message processing which exceeds the session
>> > > > timeout,
>> > > > > no
>> > > > > > > > > > > heartbeats
>> > > > > > > > > > > > > can
>> > > > > > > > > > > > > > be sent and the consumer is removed from the
>> group.
>> > > We
>> > > > > seem
>> > > > > > > to
>> > > > > > > > > hit
>> > > > > > > > > > > this
>> > > > > > > > > > > > > > problem everywhere the consumer is used
>> (including
>> > > > Kafka
>> > > > > > > > Connect
>> > > > > > > > > > and
>> > > > > > > > > > > > > Kafka
>> > > > > > > > > > > > > > Streams) and we don't always have a great
>> solution.
>> > > > I've
>> > > > > > > > written
>> > > > > > > > > a
>> > > > > > > > > > > KIP
>> > > > > > > > > > > > to
>> > > > > > > > > > > > > > address this problem here:
>> > > > > > > > > > > > > >
>> > > > > > > > > > > > > >
>> > > > > > > > > > > > >
>> > > > > > > > > > > >
>> > > > > > > > > > >
>> > > > > > > > > >
>> > > > > > > > >
>> > > > > > > >
>> > > > > > >
>> > > > > >
>> > > > >
>> > > >
>> > >
>> >
>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-62%3A+Allow+consumer+to+send+heartbeats+from+a+background+thread
>> > > > > > > > > > > > > > .
>> > > > > > > > > > > > > > Have a look and let me know what you think.
>> > > > > > > > > > > > > >
>> > > > > > > > > > > > > > Thanks,
>> > > > > > > > > > > > > > Jason
>> > > > > > > > > > > > > >
>> > > > > > > > > > > > >
>> > > > > > > > > > > > >
>> > > > > > > > > > > > >
>> > > > > > > > > > > > > --
>> > > > > > > > > > > > > Grant Henke
>> > > > > > > > > > > > > Software Engineer | Cloudera
>> > > > > > > > > > > > > grant@cloudera.com | twitter.com/gchenke |
>> > > > > > > > > > linkedin.com/in/granthenke
>> > > > > > > > > > > > >
>> > > > > > > > > > > >
>> > > > > > > > > > >
>> > > > > > > > > >
>> > > > > > > > >
>> > > > > > > > >
>> > > > > > > > >
>> > > > > > > > > --
>> > > > > > > > > -- Guozhang
>> > > > > > > > >
>> > > > > > > >
>> > > > > > >
>> > > > > > >
>> > > > > > >
>> > > > > > > --
>> > > > > > > -- Guozhang
>> > > > > > >
>> > > > > >
>> > > > >
>> > > > >
>> > > > >
>> > > > > --
>> > > > > Thanks,
>> > > > > Ewen
>> > > > >
>> > > >
>> > > >
>> > > >
>> > > > --
>> > > > -- Guozhang
>> > > >
>> > >
>> >
>>
>
>
>
> --
> -- Guozhang
>



-- 
-- Guozhang

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message