kafka-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Jason Gustafson <ja...@confluent.io>
Subject Re: [DISCUSS] KIP-345: Reduce multiple consumer rebalances by specifying member id
Date Fri, 16 Nov 2018 08:04:46 GMT
>
> If we initialize a set of member names (I assume ids = names here) on
> broker through Admin API, the client needs to pick up this information
> simultaneously which I doubt if there is a generic way to achieve that? It
> would also make the scaling operations difficult if we need to define the
> member names every time we change the member set which is an extra
> operation burden. From my daily ops experience, dynamically generate member
> names on client side would be easier. Is there a good approach to address
> this issue?


Yeah, that's a good question. I'm hoping someone with more kubernetes
experience will jump in here. Basically my goal is to have an approach
which maps nicely to StatefulSets (
https://kubernetes.io/docs/tutorials/stateful-application/basic-stateful-set/).
The pods in a stateful set have an ordinal index, which sounds similar to
the static ids that I was describing. You can scale up and down a stateful
set, but you would need a plugin to grow and shrink the consumer group.
Sounds like it could work, but I'm not sure if it's the best way.

At Pinterest we maintain 24-7 SLA for 15 - 30 minutes reaction to all the
> critical streaming services abnormality. One of the burden was the night
> shift which requires the oncaller to quickly resolve the issue and get the
> streaming application back on track, however there is a chance of miss. My
> concern was that if we forfeit the timeout on static membership to trigger
> rebalance, missing some pages during midnight could be negatively
> impacting the system performance since we may realize that some partitions
> stop working for a couple of hours already until next morning. So
> registration timeout serves as the "last line of defense" to guarantee
> liveness if no human intervention jumps in.


Thanks, this is helpful background. I agree this is a risk in the approach
I've suggested. If we take a step back, I think there are two gaps in the
protocol for stateful applications:

1. We don't have a way to detect the same member across failures or
restarts. I think streams has some heuristic to try and handle the common
cases (such as rolling restarts), but the proposal here solves the problem
in a more robust way.

2. We don't have a way to know what the expected membership of the group
is. This leads us to try tricks like inserting delays into the rebalance
logic so that the group membership has time to stabilize before we make any
decisions. In your proposal, we have an expansion timeout, which is
basically the same thing as far as I can tell.

I think the first problem is the most important, but it would be nice if we
can solve the second problem as well. If we have a way to indicate the
expected group members, then the group can respond to a change much more
quickly. There would be no need to wait 5 minutes for all members to join
and it would be robust in the presence of failures. Ironically, static
membership in this case makes the group more dynamic ;).

That said, I can see how the registration timeout would be an attractive
safety net in some cases. Perhaps it would be good enough if we have a way
to pre-register group members administratively? Members can still be
expired due to inactivity and we would have a way to get around the
rebalance delays. Would that work?

Thanks,
Jason


On Wed, Nov 14, 2018 at 10:24 PM, Boyang Chen <bchen11@outlook.com> wrote:

> Thank you for the clarification Jason! The proposals make sense here and
> let me continue the discussion.
>
> > Then the ids would be determined using some convention. Most likely, we
> would just use sequential numbers 0, 1, 2,
> > etc. We do the same thing for partition ids.
>
>
> If we initialize a set of member names (I assume ids = names here) on
> broker through Admin API, the client needs to pick up this information
> simultaneously which I doubt if there is a generic way to achieve that? It
> would also make the scaling operations difficult if we need to define the
> member names every time we change the member set which is an extra
> operation burden. From my daily ops experience, dynamically generate member
> names on client side would be easier. Is there a good approach to address
> this issue?
>
> > I was thinking that the registration is specified ahead of time and
> remains valid until changed. It would be more like a
> > replica assignment. We don't move partitions just because a broker is
> down.
> > The expectation is that it the broker will eventually return.
>
>
> At Pinterest we maintain 24-7 SLA for 15 - 30 minutes reaction to all the
> critical streaming services abnormality. One of the burden was the night
> shift which requires the oncaller to quickly resolve the issue and get the
> streaming application back on track, however there is a chance of miss. My
> concern was that if we forfeit the timeout on static membership to trigger
> rebalance, missing some pages during midnight could be negatively impacting
> the system performance since we may realize that some partitions stop
> working for a couple of hours already until next morning. So registration
> timeout serves as the "last line of defense" to guarantee liveness if no
> human intervention jumps in.
>
>
> I'm very interested in the replication protocol currently implemented on
> Kafka, but I'm not familiar with it. If we do have mechanism to handle
> issues like I mentioned above for replication (auto healing during
> mid-night if one broker is never back), we could continue discussing the
> new approaches to have basic guarantee of consumer group liveness.
>
>
> The discussion so far is to make sure that all the design approaches we
> have taken are pointing to real scenarios. Once we clarify the scenarios,
> we would definitely propose better solution on top of it. I hope these
> discussions make sense. Thanks again for helping make the design solid!
>
>
> Boyang
>
> ________________________________
> From: Jason Gustafson <jason@confluent.io>
> Sent: Thursday, November 15, 2018 9:54 AM
> To: dev
> Subject: Re: [DISCUSS] KIP-345: Reduce multiple consumer rebalances by
> specifying member id
>
> >
> > I feel this would make the current protocol harder to use. For example,
> on
> > KStream we typically would expect (number of instances * number of
> threads
> > per instance) consumers. Giving out all this many member names in a list
> > may not be easy, compared with dynamic generation of member names, at
> least
> > for KStream use case.
>
>
> That's a fair point. What I had in mind is for the API to specify the
> number of consumers in the group. Then the ids would be determined using
> some convention. Most likely, we would just use sequential numbers 0, 1, 2,
> etc. We do the same thing for partition ids. Streams folks can chime in and
> say whether that would work or not.
>
> So this suggests we will not rely on heartbeat and commit interval to
> > trigger rebalance? Even in static membership, I feel tracking active
> > members is still required to be handled by broker since not all users are
> > fully equipped with monitoring tools, otherwise users will feel
> > uncomfortable using static membership solely for reducing rebalance
> purpose.
>
>
> Let me clarify. I think it is still useful to distinguish the liveness of
> the consumer using the heartbeat mechanism and to propagate that
> information to the leader during rebalances. This gives the group leader
> the option (but not the requirement) to change the partition assignment for
> inactive members. My suggestion was more about how long the static
> registration would remain valid. In the current proposal we have a
> registration timeout, and I think Mayuresh is fair to point out the
> potential confusion with the session timeout. It's kind of the same, but
> not exactly. So instead, I was thinking that the registration is specified
> ahead of time and remains valid until changed. It would be more like a
> replica assignment. We don't move partitions just because a broker is down.
> The expectation is that it the broker will eventually return. Similarly for
> stateful applications, we would have the option to do the same thing. The
> registration is fixed until someone changes it.
>
> Does that make sense?
>
> Thanks,
> Jason
>
>
> On Wed, Nov 14, 2018 at 2:46 PM, Boyang Chen <bchen11@outlook.com> wrote:
>
> > Thanks Jason for the suggestions! I update the KIP with full schema
> > changes.
> >
> > > we offer an admin API that lets a user define the expected members of
> > the group.
> >
> > I feel this would make the current protocol harder to use. For example,
> on
> > KStream we typically would expect (number of instances * number of
> threads
> > per instance) consumers. Giving out all this many member names in a list
> > may not be easy, compared with dynamic generation of member names, at
> least
> > for KStream use case.
> >
> > > For the sake of discussion, I was wondering if we could just say that
> > static members do not expire.
> >
> >
> > So this suggests we will not rely on heartbeat and commit interval to
> > trigger rebalance? Even in static membership, I feel tracking active
> > members is still required to be handled by broker since not all users are
> > fully equipped with monitoring tools, otherwise users will feel
> > uncomfortable using static membership solely for reducing rebalance
> purpose.
> >
> >
> > For the admin API design, I'm simplifying the join group request
> handling,
> > while using admin tool to switch between static and dynamic membership
> and
> > set the two corresponding timeouts. Do you think this approach makes
> sense?
> > The version one implementation will be much more clean if we handle
> > membership change through user intervention.
> >
> >
> > Best,
> >
> > Boyang
> >
> > ________________________________
> > From: Jason Gustafson <jason@confluent.io>
> > Sent: Wednesday, November 14, 2018 9:31 AM
> > To: dev
> > Subject: Re: [DISCUSS] KIP-345: Reduce multiple consumer rebalances by
> > specifying member id
> >
> > Hey Boyang,
> >
> > Thanks for the updates. From a high level, I think this actually
> > complements Konstantine's writeup on incremental rebalancing. The gap
> we're
> > addressing is providing a way to bind the the partition assignment of a
> > group to a set of user-provided ids so that we are not so reliant on the
> > group's immediate state. For example, these ids might identify the state
> > store volume for particular streams instances. This is basically what you
> > need to work well with k8s stateful sets (as far as I understand them).
> >
> > One key decision is how we would define and update the expected static
> > members in a consumer group. The mechanics of the registration and
> > expansion timeouts feel a little bit clunky. For the sake of discussion,
> I
> > was wondering if we could just say that static members do not expire.
> > Instead, we offer an admin API that lets a user define the expected
> members
> > of the group. This API could be used to both grow and shrink a group.
> This
> > would solve the rebalancing problems when applications are initially
> > bootstrapped or when they are restarted because we would always know how
> > many members should be in a group. What do you think?
> >
> > By the way, it would be helpful to include the full schema definition for
> > any protocol changes in the proposal.
> >
> > Thanks,
> > Jason
> >
> >
> > On Mon, Nov 12, 2018 at 8:56 AM, Boyang Chen <bchen11@outlook.com>
> wrote:
> >
> > > Thanks Mayuresh for the feedback! Do you have a quick example for
> passing
> > > in consumer config dynamically? I mainly use Kafka Streams at my daily
> > work
> > > so probably missing the idea how to do it in the current consumer
> > setting.
> > >
> > >
> > > For differentiating session timeout and registration timeout, I would
> try
> > > to enhance the documentation in the first stage to see how people react
> > to
> > > the confusion (would be great if they feel straightforward!). Since one
> > > doesn't have to fully understand the difference unless defining the new
> > > config "member name", for current users we could buy some time to
> listen
> > to
> > > their understandings and improve our documentation correspondingly in
> the
> > > follow-up KIPs.
> > >
> > >
> > > Boyang
> > >
> > > ________________________________
> > > From: Mayuresh Gharat <gharatmayuresh15@gmail.com>
> > > Sent: Sunday, November 11, 2018 1:06 PM
> > > To: dev@kafka.apache.org
> > > Subject: Re: [DISCUSS] KIP-345: Reduce multiple consumer rebalances by
> > > specifying member id
> > >
> > > Hi Boyang,
> > >
> > > Thanks for the reply.
> > >
> > > Please find the replies inline below :
> > > For having a consumer config at runtime, I think it's not necessary to
> > > address in this KIP because most companies run sidecar jobs through
> > daemon
> > > software like puppet. It should be easy to change the config through
> > script
> > > or UI without actual code change. We still want to leave flexibility
> for
> > > user to define member name as they like.
> > > ---- This might be little different for companies that use
> configuration
> > > management tools that does not allow the applications to define/change
> > the
> > > configs dynamically. For example, if we use something similar to spring
> > to
> > > pull in the configs for the KafkaConsumer and pass it to the
> constructor
> > to
> > > create the KafkaConsumer object, it will be hard to specify a unique
> > value
> > > to the "MEMBER_NAME" config unless someone deploying the app generates
> a
> > > unique string for this config outside the deployment workflow and
> copies
> > it
> > > statically before starting up each consumer instance. Unless we can
> > loosen
> > > the criteria for uniqueness of this config value, for each consumer
> > > instance in the consumer group, I am not sure of a better way of
> > > addressing this. If we don't want to loosen the criteria, then
> providing
> > a
> > > dynamic way to pass this in at runtime, would put the onus of having
> the
> > > same unique value each time a consumer is restarted, on to the
> > application
> > > that is running the consumer.
> > >
> > > I just updated the kip about having both "registration timeout" and
> > > "session timeout". The benefit of having two configs instead of one is
> to
> > > reduce the mental burden for operation, for example user just needs to
> > > unset "member name" to cast back to dynamic membership without worrying
> > > about tuning the "session timeout" back to a smaller value.
> > > --- That is a good point. I was thinking, if both the configs are
> > > specified, it would be confusing for the end user without understanding
> > the
> > > internals of the consumer and its interaction with group coordinator,
> as
> > > which takes precedence when and how it affects the consumer behavior.
> > Just
> > > my 2 cents.
> > >
> > > Thanks,
> > >
> > > Mayuresh
> > >
> > > On Fri, Nov 9, 2018 at 8:27 PM Boyang Chen <bchen11@outlook.com>
> wrote:
> > >
> > > > Hey Mayuresh,
> > > >
> > > >
> > > > thanks for the thoughtful questions! Let me try to answer your
> > questions
> > > > one by one.
> > > >
> > > >
> > > > For having a consumer config at runtime, I think it's not necessary
> to
> > > > address in this KIP because most companies run sidecar jobs through
> > > daemon
> > > > software like puppet. It should be easy to change the config through
> > > script
> > > > or UI without actual code change. We still want to leave flexibility
> > for
> > > > user to define member name as they like.
> > > >
> > > >
> > > > I just updated the kip about having both "registration timeout" and
> > > > "session timeout". The benefit of having two configs instead of one
> is
> > to
> > > > reduce the mental burden for operation, for example user just needs
> to
> > > > unset "member name" to cast back to dynamic membership without
> worrying
> > > > about tuning the "session timeout" back to a smaller value.
> > > >
> > > >
> > > > For backup topic, I think it's a low-level detail which could be
> > > addressed
> > > > in the implementation. I feel no preference of adding a new topic vs
> > > reuse
> > > > consumer offsets topic. I will do more analysis and make a trade-off
> > > > comparison. Nice catch!
> > > >
> > > >
> > > > I hope the explanations make sense to you. I will keep polishing on
> the
> > > > edge cases and details.
> > > >
> > > >
> > > > Best,
> > > >
> > > > Boyang
> > > >
> > > > ________________________________
> > > > From: Mayuresh Gharat <gharatmayuresh15@gmail.com>
> > > > Sent: Saturday, November 10, 2018 10:25 AM
> > > > To: dev@kafka.apache.org
> > > > Subject: Re: [DISCUSS] KIP-345: Reduce multiple consumer rebalances
> by
> > > > specifying member id
> > > >
> > > > Hi Boyang,
> > > >
> > > > Thanks for the KIP and sorry for being late to the party. This KIP is
> > > > really useful for us at Linkedin.
> > > >
> > > > I had a few questions :
> > > >
> > > > The idea of having static member name seems nice, but instead of a
> > > config,
> > > > would it be possible for it to be passed in to the consumer at
> runtime?
> > > > This is because an app might want to decide the config value at
> runtime
> > > > using its host information for example, to generate the unique member
> > > name.
> > > >
> > > > Also the KIP talks about using the "REGISTRATION_TIMEOUT_MS". I was
> > > > wondering if we can reuse the session timeout here. This might help
> us
> > to
> > > > have one less config on the consumer.
> > > >
> > > > The KIP also talks about adding another internal topic
> > > "static_member_map".
> > > > Would the semantics (GroupCoordinator broker, topic configs) be the
> > same
> > > as
> > > > __consumer_offsets topic?
> > > >
> > > > Thanks,
> > > >
> > > > Mayuresh
> > > >
> > > >
> > > > On Wed, Nov 7, 2018 at 12:17 AM Boyang Chen <bchen11@outlook.com>
> > wrote:
> > > >
> > > > > I took a quick pass of the proposal. First I would say it's a very
> > > > > brilliant initiative from Konstantine and Confluent folks. To draft
> > up
> > > a
> > > > > proposal like this needs deep understanding of the rebalance
> > protocol!
> > > I
> > > > > summarized some thoughts here.
> > > > >
> > > > >
> > > > > Overall the motivations of the two proposals align on that:
> > > > >
> > > > >   1.  Both believe the invariant resource (belonging to the same
> > > process)
> > > > > should be preserved across rebalance.
> > > > >   2.  Transit failures (K8 thread death) shouldn't trigger resource
> > > > > redistribution. I don't use rebalance here since part one of the
> > > > > cooperative proposal could potentially introduce more rebalances
> but
> > > only
> > > > > on must-move resources.
> > > > >   3.  Scale up/down and rolling bounce are causing unnecessary
> > resource
> > > > > shuffling that need to be mitigated.
> > > > >
> > > > >
> > > > > On motivation level, I think both approach could solve/mitigate the
> > > above
> > > > > issues. They are just different in design philosophy, or I would
> say
> > > the
> > > > > perspective difference between framework user and algorithm
> designer.
> > > > >
> > > > >
> > > > > Two proposals have different focuses. KIP-345 is trying to place
> more
> > > > > fine-grained control on the broker side to reduce the unnecessary
> > > > > rebalances, while keeping the client logic intact. This is pretty
> > > > intuitive
> > > > > cause-effect for normal developers who are not very familiar with
> > > > rebalance
> > > > > protocol. As a developer working with Kafka Streams daily, I'd be
> > happy
> > > > to
> > > > > see a simplified rebalance protocol and just focus on maintaining
> the
> > > > > stream/consumer jobs. Too many rebalances raised my concern on the
> > job
> > > > > health. To be concise, static membership has the advantage of
> > reducing
> > > > > mental burden.
> > > > >
> > > > >
> > > > > Cooperative proposal takes thoughtful approach on client side. We
> > want
> > > to
> > > > > have fine-grained control on the join/exit group behaviors and make
> > the
> > > > > current dynamic membership better to address above issues. I do
> feel
> > > our
> > > > > idea crossed on the delayed rebalance when we scale up/down, which
> > > could
> > > > > potentially reduce the state shuffling and decouple the behavior
> from
> > > > > session timeout which is already overloaded.  In this sense, I
> > believe
> > > > both
> > > > > approaches would serve well in making "reasonable rebalance" happen
> > at
> > > > the
> > > > > "right timing".
> > > > >
> > > > >
> > > > > However, based on my understanding, either 345 or cooperative
> > > rebalancing
> > > > > is not solving the problem Mike has proposed: could we do a better
> > job
> > > at
> > > > > scaling up/down in ideal timing? My initial response was to
> introduce
> > > an
> > > > > admin API which now I feel is sub-optimal, in that the goal of
> smooth
> > > > > transition is to make sure the newly up hosts are actually "ready".
> > For
> > > > > example:
> > > > >
> > > > >
> > > > > We have 4 instance reading from 8 topic partitions (= 8 tasks). At
> > some
> > > > > time we would like to scale up to 8 hosts, with the current
> > > improvements
> > > > we
> > > > > could reduce 4 potential rebalances to a single one. But the new
> > hosts
> > > > are
> > > > > yet unknown to be "ready" if they need to reconstruct the local
> > state.
> > > To
> > > > > be actually ready, we need 4 standby tasks running on those empty
> > hosts
> > > > and
> > > > > leader needs to wait for the signal of "replay/reconstruct
> complete"
> > to
> > > > > actually involve them into the main consumer group. Otherwise,
> > > rebalance
> > > > > just kills our performance since we need to wait indefinite long
> for
> > > task
> > > > > migration.
> > > > >
> > > > >
> > > > > The scale down is also tricky such that we are not able to define a
> > > > "true"
> > > > > leave of a member. Rebalance immediately after "true" leaves are
> most
> > > > > optimal comparing with human intervention. Does this make sense?
> > > > >
> > > > >
> > > > > My intuition is that cooperative approach which was implemented on
> > the
> > > > > client side could better handle scaling cases than KIP 345, since
> it
> > > > > involves a lot of algorithmic changes to define "replaying" stage,
> > > which
> > > > I
> > > > > feel would over-complicate broker logic if implemented on
> > coordinator.
> > > If
> > > > > we let 345 focus on reducing unnecessary rebalance, and let
> > cooperative
> > > > > approach focus on judging best timing of scale up/down, the two
> > efforts
> > > > > could be aligned. In long term, I feel the more complex improvement
> > of
> > > > > consumer protocol should happen on client side instead of server
> side
> > > > which
> > > > > is easier to test and has less global impact for the entire Kafka
> > > > > production cluster.
> > > > >
> > > > >
> > > > > Thanks again to Konstantine, Matthias and other folks in coming up
> > with
> > > > > this great client proposal. This is great complementation to KIP
> 345.
> > > In
> > > > a
> > > > > high level, we are not having any collision on the path and both
> > > > proposals
> > > > > are making sense here. Just need better sync to avoid duplicate
> > effort
> > > :)
> > > > >
> > > > >
> > > > > Best,
> > > > >
> > > > > Boyang
> > > > >
> > > > >
> > > > > ________________________________
> > > > > From: Boyang Chen <bchen11@outlook.com>
> > > > > Sent: Wednesday, November 7, 2018 1:57 PM
> > > > > To: dev@kafka.apache.org
> > > > > Subject: Re: [DISCUSS] KIP-345: Reduce multiple consumer rebalances
> > by
> > > > > specifying member id
> > > > >
> > > > > Thanks Matthias for bringing this awesome proposal up! I shall
> take a
> > > > > deeper look and make a comparison between the two proposals.
> > > > >
> > > > >
> > > > > Meanwhile for the scale down specifically for stateful streaming,
> we
> > > > could
> > > > > actually introduce a new status called "learner" where the newly up
> > > hosts
> > > > > could try to catch up with the assigned task progress first before
> > > > > triggering the rebalance, from which we don't see a sudden dip on
> the
> > > > > progress. However, it is built on top of the success of KIP-345.
> > > > >
> > > > >
> > > > > ________________________________
> > > > > From: Matthias J. Sax <matthias@confluent.io>
> > > > > Sent: Wednesday, November 7, 2018 7:02 AM
> > > > > To: dev@kafka.apache.org
> > > > > Subject: Re: [DISCUSS] KIP-345: Reduce multiple consumer rebalances
> > by
> > > > > specifying member id
> > > > >
> > > > > Hey,
> > > > >
> > > > > there was quite a pause on this KIP discussion and in the mean
> time,
> > a
> > > > > new design for incremental cooporative rebalance was suggested:
> > > > >
> > > > >
> > > > >
> > > > https://cwiki.apache.org/confluence/display/KAFKA/Incrementa
> > > l+Cooperative+Rebalancing%3A+Support+and+Policies
> > > Incremental Cooperative Rebalancing: Support and Policies ...<
> > > https://cwiki.apache.org/confluence/display/KAFKA/Increm
> > > ental+Cooperative+Rebalancing%3A+Support+and+Policies>
> > > cwiki.apache.org
> > > Rebalancing between distributed application processes in Apache Kafka
> was
> > > enhanced considerably when it was decoupled as logic from Kafka brokers
> > and
> > > was moved as responsibility to the clients and specifically to Kafka
> > > Consumer. This pattern has been working robustly for quite a while now
> > and
> > > has ...
> > >
> > >
> > >
> > > > Incremental Cooperative Rebalancing: Support and Policies ...<
> > > > https://cwiki.apache.org/confluence/display/KAFKA/Incrementa
> > > l+Cooperative+Rebalancing%3A+Support+and+Policies
> > > > >
> > > > cwiki.apache.org
> > > > Rebalancing between distributed application processes in Apache Kafka
> > was
> > > > enhanced considerably when it was decoupled as logic from Kafka
> brokers
> > > and
> > > > was moved as responsibility to the clients and specifically to Kafka
> > > > Consumer. This pattern has been working robustly for quite a while
> now
> > > and
> > > > has ...
> > > >
> > > >
> > > >
> > > > >
> > > > >
> > > > > We should make sure that the proposal and this KIP align to each
> > other.
> > > > > Thoughts?
> > > > >
> > > > >
> > > > > -Matthias
> > > > >
> > > > > On 11/5/18 7:31 PM, Boyang Chen wrote:
> > > > > > Hey Mike,
> > > > > >
> > > > > >
> > > > > > thanks for the feedback, the two question are very thoughtful!
> > > > > >
> > > > > >
> > > > > >> 1) I am a little confused about the distinction for the leader.
> If
> > > the
> > > > > consumer node that was assigned leader does a bounce (goes down and
> > > > quickly
> > > > > comes up) to update application code, will a rebalance be
> triggered?
> > I
> > > >
> > > > do
> > > > > not think a bounce of the leader should trigger a rebalance.
> > > > > >
> > > > > > For Q1 my intention was to minimize the change within one KIP,
> > since
> > > > the
> > > > > leader rejoining case could be addressed separately.
> > > > > >
> > > > > >
> > > > > >> 2) The timeout for shrink up makes a lot of sense and allows to
> > > > > gracefully increase the number of nodes in the cluster. I think we
> > need
> > > > to
> > > > > support graceful shrink down as well. If I set the registration
> > timeout
> > > > to
> > > > > 5 minutes > to handle rolling restarts or intermittent failures
> > without
> > > > > shuffling state, I don't want to wait 5 minutes in order for the
> > group
> > > to
> > > > > rebalance if I am intentionally removing a node from the cluster. I
> > am
> > > > not
> > > > > sure the best way to > do this. One idea I had was adding the
> ability
> > > > for a
> > > > > CLI or Admin API to force a rebalance of the group. This would
> allow
> > > for
> > > > an
> > > > > admin to trigger the rebalance manually without waiting the entire
> > > > > registration timeout on > shrink down. What do you think?
> > > > > >
> > > > > > For 2) my understanding is that for scaling down case it is
> better
> > to
> > > > be
> > > > > addressed by CLI tool than code logic, since only by human
> evaluation
> > > we
> > > > > could decide whether it is a "right timing" -- the time when all
> the
> > > > > scaling down consumers are offline -- to kick in rebalance. Unless
> we
> > > > > introduce another term on coordinator which indicates the target
> > > consumer
> > > > > group size, broker will find it hard to decide when to start
> > rebalance.
> > > > So
> > > > > far I prefer to hold the implementation for that, but agree we
> could
> > > > > discuss whether we want to introduce admin API in this KIP or a
> > > separate
> > > > > one.
> > > > > >
> > > > > >
> > > > > > Thanks again for the proposed ideas!
> > > > > >
> > > > > >
> > > > > > Boyang
> > > > > >
> > > > > > ________________________________
> > > > > > From: Mike Freyberger <mike.freyberger@xandr.com>
> > > > > > Sent: Monday, November 5, 2018 6:13 AM
> > > > > > To: dev@kafka.apache.org
> > > > > > Subject: Re: [DISCUSS] KIP-345: Reduce multiple consumer
> rebalances
> > > by
> > > > > specifying member id
> > > > > >
> > > > > > Boyang,
> > > > > >
> > > > > > Thanks for updating the KIP. It's shaping up well. Two things:
> > > > > >
> > > > > > 1) I am a little confused about the distinction for the leader.
> If
> > > the
> > > > > consumer node that was assigned leader does a bounce (goes down and
> > > > quickly
> > > > > comes up) to update application code, will a rebalance be
> triggered?
> > I
> > > do
> > > > > not think a bounce of the leader should trigger a rebalance.
> > > > > >
> > > > > > 2) The timeout for shrink up makes a lot of sense and allows to
> > > > > gracefully increase the number of nodes in the cluster. I think we
> > need
> > > > to
> > > > > support graceful shrink down as well. If I set the registration
> > timeout
> > > > to
> > > > > 5 minutes to handle rolling restarts or intermittent failures
> without
> > > > > shuffling state, I don't want to wait 5 minutes in order for the
> > group
> > > to
> > > > > rebalance if I am intentionally removing a node from the cluster. I
> > am
> > > > not
> > > > > sure the best way to do this. One idea I had was adding the ability
> > > for a
> > > > > CLI or Admin API to force a rebalance of the group. This would
> allow
> > > for
> > > > an
> > > > > admin to trigger the rebalance manually without waiting the entire
> > > > > registration timeout on shrink down. What do you think?
> > > > > >
> > > > > > Mike
> > > > > >
> > > > > > ?On 10/30/18, 1:55 AM, "Boyang Chen" <bchen11@outlook.com>
> wrote:
> > > > > >
> > > > > >     Btw, I updated KIP 345 based on my understanding. Feel free
> to
> > > take
> > > > > another round of look:
> > > > > >
> > > > > >
> > > > >
> > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-345%3A
> > > +Introduce+static+membership+protocol+to+reduce+consumer+rebalances
> > > > > > KIP-345: Introduce static membership protocol to reduce ...<
> > > > >
> > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-345%3A
> > > +Introduce+static+membership+protocol+to+reduce+consumer+rebalances
> > > > > >
> > > > > > cwiki.apache.org
> > > > > > For stateful applications, one of the biggest performance
> > bottleneck
> > > is
> > > > > the state shuffling. In Kafka consumer, there is a concept called
> > > > > "rebalance" which means that for given M partitions and N consumers
> > in
> > > > one
> > > > > consumer group, Kafka will try to balance the load between
> consumers
> > > and
> > > > > ideally have ...
> > > > > >
> > > > > >
> > > > > >
> > > > > >
> > > > > >     KIP-345: Introduce static membership protocol to reduce ...<
> > > > >
> > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-345%3A
> > > +Introduce+static+membership+protocol+to+reduce+consumer+rebalances
> > > > > >
> > > > > >     cwiki.apache.org
> > > > > >     For stateful applications, one of the biggest performance
> > > > bottleneck
> > > > > is the state shuffling. In Kafka consumer, there is a concept
> called
> > > > > "rebalance" which means that for given M partitions and N consumers
> > in
> > > > one
> > > > > consumer group, Kafka will try to balance the load between
> consumers
> > > and
> > > > > ideally have ...
> > > > > >
> > > > > >
> > > > > >
> > > > > >
> > > > > >
> > > > > >     ________________________________
> > > > > >     From: Boyang Chen <bchen11@outlook.com>
> > > > > >     Sent: Monday, October 29, 2018 12:34 PM
> > > > > >     To: dev@kafka.apache.org
> > > > > >     Subject: Re: [DISCUSS] KIP-345: Reduce multiple consumer
> > > rebalances
> > > > > by specifying member id
> > > > > >
> > > > > >     Thanks everyone for the input on this thread! (Sorry it's
> been
> > a
> > > > > while) I feel that we are very close to the final solution.
> > > > > >
> > > > > >
> > > > > >     Hey Jason and Mike, I have two quick questions on the new
> > > features
> > > > > here:
> > > > > >
> > > > > >       1.  so our proposal is that until we add a new static
> member
> > > into
> > > > > the group (scale up), we will not trigger rebalance until the
> > > > "registration
> > > > > timeout"( the member has been offline for too long)? How about
> > leader's
> > > > > rejoin request, I think we should still trigger rebalance when that
> > > > > happens, since the consumer group may have new topics to consume?
> > > > > >       2.  I'm not very clear on the scale up scenario in static
> > > > > membership here. Should we fallback to dynamic membership while
> > > > > adding/removing hosts (by setting member.name = null), or we still
> > > want
> > > > > to add instances with `member.name` so that we eventually
> > > expand/shrink
> > > > > the static membership? I personally feel the easier solution is to
> > spin
> > > > up
> > > > > new members and wait until either the same "registration timeout"
> or
> > a
> > > > > "scale up timeout" before starting the rebalance. What do you
> think?
> > > > > >
> > > > > >     Meanwhile I will go ahead to make changes to the KIP with our
> > > newly
> > > > > discussed items and details. Really excited to see the design has
> > > become
> > > > > more solid.
> > > > > >
> > > > > >     Best,
> > > > > >     Boyang
> > > > > >
> > > > > >     ________________________________
> > > > > >     From: Jason Gustafson <jason@confluent.io>
> > > > > >     Sent: Saturday, August 25, 2018 6:04 AM
> > > > > >     To: dev
> > > > > >     Subject: Re: [DISCUSS] KIP-345: Reduce multiple consumer
> > > rebalances
> > > > > by specifying member id
> > > > > >
> > > > > >     Hey Mike,
> > > > > >
> > > > > >     Yeah, that's a good point. A long "registration timeout" may
> > not
> > > be
> > > > > a great
> > > > > >     idea. Perhaps in practice you'd set it long enough to be able
> > to
> > > > > detect a
> > > > > >     failure and provision a new instance. Maybe on the order of
> 10
> > > > > minutes is
> > > > > >     more reasonable.
> > > > > >
> > > > > >     In any case, it's probably a good idea to have an
> > administrative
> > > > way
> > > > > to
> > > > > >     force deregistration. One option is to extend the
> DeleteGroups
> > > API
> > > > > with a
> > > > > >     list of members names.
> > > > > >
> > > > > >     -Jason
> > > > > >
> > > > > >
> > > > > >
> > > > > >     On Fri, Aug 24, 2018 at 2:21 PM, Mike Freyberger <
> > > > > mfreyberger@appnexus.com>
> > > > > >     wrote:
> > > > > >
> > > > > >     > Jason,
> > > > > >     >
> > > > > >     > Regarding step 4 in your proposal which suggests beginning
> a
> > > long
> > > > > timer
> > > > > >     > (30 minutes) when a static member leaves the group, would
> > there
> > > > > also be the
> > > > > >     > ability for an admin to force a static membership
> expiration?
> > > > > >     >
> > > > > >     > I'm thinking that during particular types of outages or
> > > upgrades
> > > > > users
> > > > > >     > would want forcefully remove a static member from the
> group.
> > > > > >     >
> > > > > >     > So the user would shut the consumer down normally, which
> > > wouldn't
> > > > > trigger
> > > > > >     > a rebalance. Then the user could use an admin CLI tool to
> > force
> > > > > remove that
> > > > > >     > consumer from the group, so the TopicPartitions that were
> > > > > previously owned
> > > > > >     > by that consumer can be released.
> > > > > >     >
> > > > > >     > At a high level, we need consumer groups to gracefully
> handle
> > > > > intermittent
> > > > > >     > failures and permanent failures. Currently, the consumer
> > group
> > > > > protocol
> > > > > >     > handles permanent failures well, but does not handle
> > > intermittent
> > > > > failures
> > > > > >     > well (it creates unnecessary rebalances). I want to make
> sure
> > > the
> > > > > overall
> > > > > >     > solution here handles both intermittent failures and
> > permanent
> > > > > failures,
> > > > > >     > rather than sacrificing support for permanent failures in
> > order
> > > > to
> > > > > provide
> > > > > >     > support for intermittent failures.
> > > > > >     >
> > > > > >     > Mike
> > > > > >     >
> > > > > >     > Sent from my iPhone
> > > > > >     >
> > > > > >     > > On Aug 24, 2018, at 3:03 PM, Jason Gustafson <
> > > > jason@confluent.io>
> > > > > wrote:
> > > > > >     > >
> > > > > >     > > Hey Guozhang,
> > > > > >     > >
> > > > > >     > > Responses below:
> > > > > >     > >
> > > > > >     > > Originally I was trying to kill more birds with one stone
> > > with
> > > > > KIP-345,
> > > > > >     > >> e.g. to fix the multi-rebalance issue on starting up /
> > > > shutting
> > > > > down a
> > > > > >     > >> multi-instance client (mentioned as case 1)/2) in my
> early
> > > > > email), and
> > > > > >     > >> hence proposing to have a pure static-membership
> protocol.
> > > But
> > > > > thinking
> > > > > >     > >> twice about it I now feel it may be too ambitious and
> > worth
> > > > > fixing in
> > > > > >     > >> another KIP.
> > > > > >     > >
> > > > > >     > >
> > > > > >     > > I was considering an extension to support
> > pre-initialization
> > > of
> > > > > the
> > > > > >     > static
> > > > > >     > > members of the group, but I agree we should probably
> leave
> > > this
> > > > > problem
> > > > > >     > for
> > > > > >     > > future work.
> > > > > >     > >
> > > > > >     > > 1. How this longish static member expiration timeout
> > defined?
> > > > Is
> > > > > it via a
> > > > > >     > >> broker, hence global config, or via a client config
> which
> > > can
> > > > be
> > > > > >     > >> communicated to broker via JoinGroupRequest?
> > > > > >     > >
> > > > > >     > >
> > > > > >     > > I am not too sure. I tend to lean toward server-side
> > configs
> > > > > because they
> > > > > >     > > are easier to evolve. If we have to add something to the
> > > > > protocol, then
> > > > > >     > > we'll be stuck with it forever.
> > > > > >     > >
> > > > > >     > > 2. Assuming that for static members, LEAVE_GROUP request
> > will
> > > > not
> > > > > >     > trigger a
> > > > > >     > >> rebalance immediately either, similar to session
> timeout,
> > > but
> > > > > only the
> > > > > >     > >> longer member expiration timeout, can we remove the
> > > internal "
> > > > > >     > >> internal.leave.group.on.close" config, which is a quick
> > > > > walk-around
> > > > > >     > then?
> > > > > >     > >
> > > > > >     > >
> > > > > >     > > Yeah, I hope we can ultimately get rid of it, but we may
> > need
> > > > it
> > > > > for
> > > > > >     > > compatibility with older brokers. A related question is
> > what
> > > > > should be
> > > > > >     > the
> > > > > >     > > behavior of the consumer if `member.name` is provided
> but
> > > the
> > > > > broker
> > > > > >     > does
> > > > > >     > > not support it? We could either fail or silently
> downgrade
> > to
> > > > > dynamic
> > > > > >     > > membership.
> > > > > >     > >
> > > > > >     > > -Jason
> > > > > >     > >
> > > > > >     > >
> > > > > >     > >> On Fri, Aug 24, 2018 at 11:44 AM, Guozhang Wang <
> > > > > wangguoz@gmail.com>
> > > > > >     > wrote:
> > > > > >     > >>
> > > > > >     > >> Hey Jason,
> > > > > >     > >>
> > > > > >     > >> I like your idea to simplify the upgrade protocol to
> allow
> > > > > co-exist of
> > > > > >     > >> static and dynamic members. Admittedly it may make the
> > > > > coordinator-side
> > > > > >     > >> logic a bit more complex, but I think it worth doing it.
> > > > > >     > >>
> > > > > >     > >> Originally I was trying to kill more birds with one
> stone
> > > with
> > > > > KIP-345,
> > > > > >     > >> e.g. to fix the multi-rebalance issue on starting up /
> > > > shutting
> > > > > down a
> > > > > >     > >> multi-instance client (mentioned as case 1)/2) in my
> early
> > > > > email), and
> > > > > >     > >> hence proposing to have a pure static-membership
> protocol.
> > > But
> > > > > thinking
> > > > > >     > >> twice about it I now feel it may be too ambitious and
> > worth
> > > > > fixing in
> > > > > >     > >> another KIP. With that, I think what you've proposed
> here
> > > is a
> > > > > good way
> > > > > >     > to
> > > > > >     > >> go for KIP-345 itself.
> > > > > >     > >>
> > > > > >     > >> Note there are a few details in your proposal we'd still
> > > need
> > > > > to figure
> > > > > >     > >> out:
> > > > > >     > >>
> > > > > >     > >> 1. How this longish static member expiration timeout
> > > defined?
> > > > > Is it via
> > > > > >     > a
> > > > > >     > >> broker, hence global config, or via a client config
> which
> > > can
> > > > be
> > > > > >     > >> communicated to broker via JoinGroupRequest?
> > > > > >     > >>
> > > > > >     > >> 2. Assuming that for static members, LEAVE_GROUP request
> > > will
> > > > > not
> > > > > >     > trigger a
> > > > > >     > >> rebalance immediately either, similar to session
> timeout,
> > > but
> > > > > only the
> > > > > >     > >> longer member expiration timeout, can we remove the
> > > internal "
> > > > > >     > >> internal.leave.group.on.close" config, which is a quick
> > > > > walk-around
> > > > > >     > then?
> > > > > >     > >>
> > > > > >     > >>
> > > > > >     > >>
> > > > > >     > >> Guozhang
> > > > > >     > >>
> > > > > >     > >>
> > > > > >     > >> On Fri, Aug 24, 2018 at 11:14 AM, Jason Gustafson <
> > > > > jason@confluent.io>
> > > > > >     > >> wrote:
> > > > > >     > >>
> > > > > >     > >>> Hey All,
> > > > > >     > >>>
> > > > > >     > >>> Nice to see some solid progress on this. It sounds like
> > one
> > > > of
> > > > > the
> > > > > >     > >>> complications is allowing static and dynamic
> registration
> > > to
> > > > > coexist.
> > > > > >     > I'm
> > > > > >     > >>> wondering if we can do something like the following:
> > > > > >     > >>>
> > > > > >     > >>> 1. Statically registered members (those joining the
> group
> > > > with
> > > > > a
> > > > > >     > >> non-null `
> > > > > >     > >>> member.name`) maintain a session with the coordinator
> > just
> > > > > like
> > > > > >     > dynamic
> > > > > >     > >>> members.
> > > > > >     > >>> 2. If a session is active for a static member when a
> > > > rebalance
> > > > > begins,
> > > > > >     > >> then
> > > > > >     > >>> basically we'll keep the current behavior. The
> rebalance
> > > will
> > > > > await the
> > > > > >     > >>> static member joining the group.
> > > > > >     > >>> 3. If a static member does not have an active session,
> > then
> > > > the
> > > > > >     > >> coordinator
> > > > > >     > >>> will not wait for it to join, but will still include it
> > in
> > > > the
> > > > > >     > rebalance.
> > > > > >     > >>> The coordinator will forward the cached subscription
> > > > > information to the
> > > > > >     > >>> leader and will cache the assignment after the
> rebalance
> > > > > completes.
> > > > > >     > (Note
> > > > > >     > >>> that we still have the generationId to fence offset
> > commits
> > > > > from a
> > > > > >     > static
> > > > > >     > >>> zombie if the assignment changes.)
> > > > > >     > >>> 4. When a static member leaves the group or has its
> > session
> > > > > expire, no
> > > > > >     > >>> rebalance is triggered. Instead, we can begin a timer
> to
> > > > > expire the
> > > > > >     > >> static
> > > > > >     > >>> registration. This would be a longish timeout (like 30
> > > > minutes
> > > > > say).
> > > > > >     > >>>
> > > > > >     > >>> So basically static members participate in all
> rebalances
> > > > > regardless
> > > > > >     > >>> whether they have an active session. In a given
> > rebalance,
> > > > > some of the
> > > > > >     > >>> members may be static and some dynamic. The group
> leader
> > > can
> > > > > >     > >> differentiate
> > > > > >     > >>> the two based on the presence of the `member.name` (we
> > > have
> > > > > to add
> > > > > >     > this
> > > > > >     > >> to
> > > > > >     > >>> the JoinGroupResponse). Generally speaking, we would
> > choose
> > > > > leaders
> > > > > >     > >>> preferentially from the active members that support the
> > > > latest
> > > > > >     > JoinGroup
> > > > > >     > >>> protocol and are using static membership. If we have to
> > > > choose
> > > > > a leader
> > > > > >     > >>> with an old version, however, it would see all members
> in
> > > the
> > > > > group
> > > > > >     > >> (static
> > > > > >     > >>> or dynamic) as dynamic members and perform the
> assignment
> > > as
> > > > > usual.
> > > > > >     > >>>
> > > > > >     > >>> Would that work?
> > > > > >     > >>>
> > > > > >     > >>> -Jason
> > > > > >     > >>>
> > > > > >     > >>>
> > > > > >     > >>> On Thu, Aug 23, 2018 at 5:26 PM, Guozhang Wang <
> > > > > wangguoz@gmail.com>
> > > > > >     > >> wrote:
> > > > > >     > >>>
> > > > > >     > >>>> Hello Boyang,
> > > > > >     > >>>>
> > > > > >     > >>>> Thanks for the updated proposal, a few questions:
> > > > > >     > >>>>
> > > > > >     > >>>> 1. Where will "change-group-timeout" be communicated
> to
> > > the
> > > > > broker?
> > > > > >     > >> Will
> > > > > >     > >>>> that be a new field in the JoinGroupRequest, or are we
> > > going
> > > > > to
> > > > > >     > >>> piggy-back
> > > > > >     > >>>> on the existing session-timeout field (assuming that
> the
> > > > > original
> > > > > >     > value
> > > > > >     > >>>> will not be used anywhere in the static membership any
> > > > more)?
> > > > > >     > >>>>
> > > > > >     > >>>> 2. "However, if the consumer takes longer than session
> > > > > timeout to
> > > > > >     > >> return,
> > > > > >     > >>>> we shall still trigger rebalance but it could still
> try
> > to
> > > > > catch
> > > > > >     > >>>> `change-group-timeout`.": what does this mean? I
> thought
> > > > your
> > > > > proposal
> > > > > >     > >> is
> > > > > >     > >>>> that for static memberships, the broker will NOT
> trigger
> > > > > rebalance
> > > > > >     > even
> > > > > >     > >>>> after session-timeout has been detected, but only that
> > > after
> > > > > >     > >>>> change-group-timeout
> > > > > >     > >>>> which is supposed to be longer than session-timeout to
> > be
> > > > > defined?
> > > > > >     > >>>>
> > > > > >     > >>>> 3. "A join group request with member.name set will be
> > > > > treated as
> > > > > >     > >>>> `static-membership` strategy", in this case, how would
> > the
> > > > > switch from
> > > > > >     > >>>> dynamic to static happen, since whoever changed the
> > > > > member.name to
> > > > > >     > >>>> not-null
> > > > > >     > >>>> will be rejected, right?
> > > > > >     > >>>>
> > > > > >     > >>>> 4. "just erase the cached mapping, and wait for
> session
> > > > > timeout to
> > > > > >     > >>> trigger
> > > > > >     > >>>> rebalance should be sufficient." this is also a bit
> > > unclear
> > > > > to me: who
> > > > > >     > >>> will
> > > > > >     > >>>> erase the cached mapping? Since it is on the
> > broker-side I
> > > > > assume that
> > > > > >     > >>>> broker has to do it. Are you suggesting to use a new
> > > request
> > > > > for it?
> > > > > >     > >>>>
> > > > > >     > >>>> 5. "Halfway switch": following 3) above, if your
> > proposal
> > > is
> > > > > basically
> > > > > >     > >> to
> > > > > >     > >>>> let "first join-request wins", and the strategy will
> > stay
> > > as
> > > > > is until
> > > > > >     > >> all
> > > > > >     > >>>> members are gone, then this will also not happen since
> > > > > whoever used
> > > > > >     > >>>> different strategy as the first guy who sends
> join-group
> > > > > request will
> > > > > >     > >> be
> > > > > >     > >>>> rejected right?
> > > > > >     > >>>>
> > > > > >     > >>>>
> > > > > >     > >>>> Guozhang
> > > > > >     > >>>>
> > > > > >     > >>>>
> > > > > >     > >>>> On Tue, Aug 21, 2018 at 9:28 AM, John Roesler <
> > > > > john@confluent.io>
> > > > > >     > >> wrote:
> > > > > >     > >>>>
> > > > > >     > >>>>> This sounds good to me!
> > > > > >     > >>>>>
> > > > > >     > >>>>> Thanks for the time you've spent on it,
> > > > > >     > >>>>> -John
> > > > > >     > >>>>>
> > > > > >     > >>>>> On Tue, Aug 21, 2018 at 12:13 AM Boyang Chen <
> > > > > bchen11@outlook.com>
> > > > > >     > >>>> wrote:
> > > > > >     > >>>>>
> > > > > >     > >>>>>> Thanks Matthias for the input. Sorry I was busy
> > recently
> > > > and
> > > > > >     > >> haven't
> > > > > >     > >>>> got
> > > > > >     > >>>>>> time to update this thread. To summarize what we
> come
> > up
> > > > so
> > > > > far,
> > > > > >     > >> here
> > > > > >     > >>>> is
> > > > > >     > >>>>> a
> > > > > >     > >>>>>> draft updated plan:
> > > > > >     > >>>>>>
> > > > > >     > >>>>>>
> > > > > >     > >>>>>> Introduce a new config called `member.name` which
> is
> > > > > supposed to
> > > > > >     > >> be
> > > > > >     > >>>>>> provided uniquely by the consumer client. The broker
> > > will
> > > > > maintain
> > > > > >     > >> a
> > > > > >     > >>>>> cache
> > > > > >     > >>>>>> with [key:member.name, value:member.id]. A join
> group
> > > > > request with
> > > > > >     > >>>>>> member.name set will be treated as
> > `static-membership`
> > > > > strategy,
> > > > > >     > >> and
> > > > > >     > >>>>> will
> > > > > >     > >>>>>> reject any join group request without member.name.
> So
> > > > this
> > > > > >     > >>>> coordination
> > > > > >     > >>>>>> change will be differentiated from the
> > > > `dynamic-membership`
> > > > > >     > >> protocol
> > > > > >     > >>> we
> > > > > >     > >>>>>> currently have.
> > > > > >     > >>>>>>
> > > > > >     > >>>>>>
> > > > > >     > >>>>>> When handling static join group request:
> > > > > >     > >>>>>>
> > > > > >     > >>>>>>  1.   The broker will check the membership to see
> > > whether
> > > > > this is
> > > > > >     > >> a
> > > > > >     > >>>> new
> > > > > >     > >>>>>> member. If new, broker allocate a unique member id,
> > > cache
> > > > > the
> > > > > >     > >> mapping
> > > > > >     > >>>> and
> > > > > >     > >>>>>> move to rebalance stage.
> > > > > >     > >>>>>>  2.   Following 1, if this is an existing member,
> > broker
> > > > > will not
> > > > > >     > >>>> change
> > > > > >     > >>>>>> group state, and return its cached member.id and
> > > current
> > > > > >     > >> assignment.
> > > > > >     > >>>>>> (unless this is leader, we shall trigger rebalance)
> > > > > >     > >>>>>>  3.   Although Guozhang has mentioned we could
> rejoin
> > > with
> > > > > pair
> > > > > >     > >>> member
> > > > > >     > >>>>>> name and id, I think for join group request it is ok
> > to
> > > > > leave
> > > > > >     > >> member
> > > > > >     > >>> id
> > > > > >     > >>>>>> blank as member name is the unique identifier. In
> > commit
> > > > > offset
> > > > > >     > >>> request
> > > > > >     > >>>>> we
> > > > > >     > >>>>>> *must* have both.
> > > > > >     > >>>>>>
> > > > > >     > >>>>>>
> > > > > >     > >>>>>> When handling commit offset request, if enabled with
> > > > static
> > > > > >     > >>> membership,
> > > > > >     > >>>>>> each time the commit request must have both
> > member.name
> > > > and
> > > > > >     > >>> member.id
> > > > > >     > >>>> to
> > > > > >     > >>>>>> be identified as a `certificated member`. If not,
> this
> > > > > means there
> > > > > >     > >>> are
> > > > > >     > >>>>>> duplicate consumer members with same member name and
> > the
> > > > > request
> > > > > >     > >> will
> > > > > >     > >>>> be
> > > > > >     > >>>>>> rejected to guarantee consumption uniqueness.
> > > > > >     > >>>>>>
> > > > > >     > >>>>>>
> > > > > >     > >>>>>> When rolling restart/shutting down gracefully, the
> > > client
> > > > > will
> > > > > >     > >> send a
> > > > > >     > >>>>>> leave group request (static membership mode). In
> > static
> > > > > membership,
> > > > > >     > >>> we
> > > > > >     > >>>>> will
> > > > > >     > >>>>>> also define `change-group-timeout` to hold on
> > rebalance
> > > > > provided by
> > > > > >     > >>>>> leader.
> > > > > >     > >>>>>> So we will wait for all the members to rejoin the
> > group
> > > > and
> > > > > do
> > > > > >     > >>> exactly
> > > > > >     > >>>>> one
> > > > > >     > >>>>>> rebalance since all members are expected to rejoin
> > > within
> > > > > timeout.
> > > > > >     > >> If
> > > > > >     > >>>>>> consumer crashes, the join group request from the
> > > > restarted
> > > > > >     > >> consumer
> > > > > >     > >>>> will
> > > > > >     > >>>>>> be recognized as an existing member and be handled
> as
> > > > above
> > > > > >     > >> condition
> > > > > >     > >>>> 1;
> > > > > >     > >>>>>> However, if the consumer takes longer than session
> > > timeout
> > > > > to
> > > > > >     > >> return,
> > > > > >     > >>>> we
> > > > > >     > >>>>>> shall still trigger rebalance but it could still try
> > to
> > > > > catch
> > > > > >     > >>>>>> `change-group-timeout`. If it failed to catch second
> > > > > timeout, its
> > > > > >     > >>>> cached
> > > > > >     > >>>>>> state on broker will be garbage collected and
> trigger
> > a
> > > > new
> > > > > >     > >> rebalance
> > > > > >     > >>>>> when
> > > > > >     > >>>>>> it finally joins.
> > > > > >     > >>>>>>
> > > > > >     > >>>>>>
> > > > > >     > >>>>>> And consider the switch between dynamic to static
> > > > > membership.
> > > > > >     > >>>>>>
> > > > > >     > >>>>>>  1.  Dynamic to static: the first joiner shall
> revise
> > > the
> > > > > >     > >> membership
> > > > > >     > >>>> to
> > > > > >     > >>>>>> static and wait for all the current members to
> > restart,
> > > > > since their
> > > > > >     > >>>>>> membership is still dynamic. Here our assumption is
> > that
> > > > the
> > > > > >     > >> restart
> > > > > >     > >>>>>> process shouldn't take a long time, as long restart
> is
> > > > > breaking the
> > > > > >     > >>>>>> `rebalance timeout` in whatever membership protocol
> we
> > > are
> > > > > using.
> > > > > >     > >>>> Before
> > > > > >     > >>>>>> restart, all dynamic member join requests will be
> > > > rejected.
> > > > > >     > >>>>>>  2.  Static to dynamic: this is more like a
> downgrade
> > > > which
> > > > > should
> > > > > >     > >>> be
> > > > > >     > >>>>>> smooth: just erase the cached mapping, and wait for
> > > > session
> > > > > timeout
> > > > > >     > >>> to
> > > > > >     > >>>>>> trigger rebalance should be sufficient. (Fallback to
> > > > current
> > > > > >     > >>> behavior)
> > > > > >     > >>>>>>  3.  Halfway switch: a corner case is like some
> > clients
> > > > keep
> > > > > >     > >> dynamic
> > > > > >     > >>>>>> membership while some keep static membership. This
> > will
> > > > > cause the
> > > > > >     > >>> group
> > > > > >     > >>>>>> rebalance forever without progress because
> > > dynamic/static
> > > > > states
> > > > > >     > >> are
> > > > > >     > >>>>>> bouncing each other. This could guarantee that we
> will
> > > not
> > > > > make the
> > > > > >     > >>>>>> consumer group work in a wrong state by having half
> > > static
> > > > > and half
> > > > > >     > >>>>> dynamic.
> > > > > >     > >>>>>>
> > > > > >     > >>>>>> To guarantee correctness, we will also push the
> member
> > > > > name/id pair
> > > > > >     > >>> to
> > > > > >     > >>>>>> _consumed_offsets topic (as Matthias pointed out)
> and
> > > > > upgrade the
> > > > > >     > >> API
> > > > > >     > >>>>>> version, these details will be further discussed
> back
> > in
> > > > > the KIP.
> > > > > >     > >>>>>>
> > > > > >     > >>>>>>
> > > > > >     > >>>>>> Are there any concern for this high level proposal?
> > Just
> > > > > want to
> > > > > >     > >>>>> reiterate
> > > > > >     > >>>>>> on the core idea of the KIP: "If the broker
> recognize
> > > this
> > > > > consumer
> > > > > >     > >>> as
> > > > > >     > >>>> an
> > > > > >     > >>>>>> existing member, it shouldn't trigger rebalance".
> > > > > >     > >>>>>>
> > > > > >     > >>>>>> Thanks a lot for everyone's input! I feel this
> > proposal
> > > is
> > > > > much
> > > > > >     > >> more
> > > > > >     > >>>>>> robust than previous one!
> > > > > >     > >>>>>>
> > > > > >     > >>>>>>
> > > > > >     > >>>>>> Best,
> > > > > >     > >>>>>>
> > > > > >     > >>>>>> Boyang
> > > > > >     > >>>>>>
> > > > > >     > >>>>>> ________________________________
> > > > > >     > >>>>>> From: Matthias J. Sax <matthias@confluent.io>
> > > > > >     > >>>>>> Sent: Friday, August 10, 2018 2:24 AM
> > > > > >     > >>>>>> To: dev@kafka.apache.org
> > > > > >     > >>>>>> Subject: Re: [DISCUSS] KIP-345: Reduce multiple
> > consumer
> > > > > rebalances
> > > > > >     > >>> by
> > > > > >     > >>>>>> specifying member id
> > > > > >     > >>>>>>
> > > > > >     > >>>>>> Hi,
> > > > > >     > >>>>>>
> > > > > >     > >>>>>> thanks for the detailed discussion. I learned a lot
> > > about
> > > > > internals
> > > > > >     > >>>> again
> > > > > >     > >>>>>> :)
> > > > > >     > >>>>>>
> > > > > >     > >>>>>> I like the idea or a user config `member.name` and
> to
> > > > keep
> > > > > `
> > > > > >     > >>> member.id`
> > > > > >     > >>>>>> internal. Also agree with Guozhang, that reusing `
> > > > client.id`
> > > > > might
> > > > > >     > >>> not
> > > > > >     > >>>>>> be a good idea.
> > > > > >     > >>>>>>
> > > > > >     > >>>>>> To clarify the algorithm, each time we generate a
> new
> > `
> > > > > member.id`,
> > > > > >     > >>> we
> > > > > >     > >>>>>> also need to update the "group membership"
> information
> > > > (ie,
> > > > > mapping
> > > > > >     > >>>>>> [member.id, Assignment]), right? Ie, the new `
> > member.id
> > > `
> > > > > replaces
> > > > > >     > >>> the
> > > > > >     > >>>>>> old entry in the cache.
> > > > > >     > >>>>>>
> > > > > >     > >>>>>> I also think, we need to preserve the `member.name
> ->
> > > > > member.id`
> > > > > >     > >>>> mapping
> > > > > >     > >>>>>> in the `__consumer_offset` topic. The KIP should
> > mention
> > > > > this IMHO.
> > > > > >     > >>>>>>
> > > > > >     > >>>>>> For changing the default value of config
> > > > > `leave.group.on.close`. I
> > > > > >     > >>>> agree
> > > > > >     > >>>>>> with John, that we should not change the default
> > config,
> > > > > because it
> > > > > >     > >>>>>> would impact all consumer groups with dynamic
> > > assignment.
> > > > > However,
> > > > > >     > >> I
> > > > > >     > >>>>>> think we can document, that if static assignment is
> > used
> > > > > (ie,
> > > > > >     > >>>>>> `member.name` is configured) we never send a
> > > > > LeaveGroupRequest
> > > > > >     > >>>>>> regardless of the config. Note, that the config is
> > > > > internal, so not
> > > > > >     > >>>> sure
> > > > > >     > >>>>>> how to document this in detail. We should not expose
> > the
> > > > > internal
> > > > > >     > >>>> config
> > > > > >     > >>>>>> in the docs.
> > > > > >     > >>>>>>
> > > > > >     > >>>>>> About upgrading: why do we need have two rolling
> > bounces
> > > > > and encode
> > > > > >     > >>>>>> "static" vs "dynamic" in the JoinGroupRequest?
> > > > > >     > >>>>>>
> > > > > >     > >>>>>> If we upgrade an existing consumer group from
> dynamic
> > to
> > > > > static, I
> > > > > >     > >>>> don't
> > > > > >     > >>>>>> see any reason why both should not work together and
> > > > single
> > > > > rolling
> > > > > >     > >>>>>> bounce would not be sufficient? If we bounce the
> first
> > > > > consumer and
> > > > > >     > >>>>>> switch from dynamic to static, it sends a `
> > member.name`
> > > > > and the
> > > > > >     > >>> broker
> > > > > >     > >>>>>> registers the [member.name, member.id] in the
> cache.
> > > Why
> > > > > would
> > > > > >     > >> this
> > > > > >     > >>>>>> interfere with all other consumer that use dynamic
> > > > > assignment?
> > > > > >     > >>>>>>
> > > > > >     > >>>>>> Also, Guozhang mentioned that for all other request,
> > we
> > > > > need to
> > > > > >     > >> check
> > > > > >     > >>>> if
> > > > > >     > >>>>>> the mapping [member.name, member.id] contains the
> > send
> > > `
> > > > > member.id`
> > > > > >     > >>> --
> > > > > >     > >>>> I
> > > > > >     > >>>>>> don't think this is necessary -- it seems to be
> > > sufficient
> > > > > to check
> > > > > >     > >>> the
> > > > > >     > >>>>>> `member.id` from the [member.id, Assignment]
> mapping
> > as
> > > > be
> > > > > do
> > > > > >     > >> today
> > > > > >     > >>> --
> > > > > >     > >>>>>> thus, checking `member.id` does not require any
> > change
> > > > > IMHO.
> > > > > >     > >>>>>>
> > > > > >     > >>>>>>
> > > > > >     > >>>>>> -Matthias
> > > > > >     > >>>>>>
> > > > > >     > >>>>>>
> > > > > >     > >>>>>>> On 8/7/18 7:13 PM, Guozhang Wang wrote:
> > > > > >     > >>>>>>> @James
> > > > > >     > >>>>>>>
> > > > > >     > >>>>>>> What you described is true: the transition from
> > dynamic
> > > > to
> > > > > static
> > > > > >     > >>>>>>> memberships are not thought through yet. But I do
> not
> > > > > think it is
> > > > > >     > >>> an
> > > > > >     > >>>>>>> impossible problem: note that we indeed moved the
> > > offset
> > > > > commit
> > > > > >     > >>> from
> > > > > >     > >>>> ZK
> > > > > >     > >>>>>> to
> > > > > >     > >>>>>>> kafka coordinator in 0.8.2 :) The migration plan is
> > to
> > > > > first to
> > > > > >     > >>>>>>> double-commits on both zk and coordinator, and then
> > do
> > > a
> > > > > second
> > > > > >     > >>> round
> > > > > >     > >>>>> to
> > > > > >     > >>>>>>> turn the zk off.
> > > > > >     > >>>>>>>
> > > > > >     > >>>>>>> So just to throw a wild idea here: also following a
> > > > > >     > >>>> two-rolling-bounce
> > > > > >     > >>>>>>> manner, in the JoinGroupRequest we can set the flag
> > to
> > > > > "static"
> > > > > >     > >>> while
> > > > > >     > >>>>>> keep
> > > > > >     > >>>>>>> the registry-id field empty still, in this case,
> the
> > > > > coordinator
> > > > > >     > >>>> still
> > > > > >     > >>>>>>> follows the logic of "dynamic", accepting the
> request
> > > > while
> > > > > >     > >>> allowing
> > > > > >     > >>>>> the
> > > > > >     > >>>>>>> protocol to be set to "static"; after the first
> > rolling
> > > > > bounce,
> > > > > >     > >> the
> > > > > >     > >>>>> group
> > > > > >     > >>>>>>> protocol is already "static", then a second rolling
> > > > bounce
> > > > > is
> > > > > >     > >>>> triggered
> > > > > >     > >>>>>> and
> > > > > >     > >>>>>>> this time we set the registry-id.
> > > > > >     > >>>>>>>
> > > > > >     > >>>>>>>
> > > > > >     > >>>>>>> Guozhang
> > > > > >     > >>>>>>>
> > > > > >     > >>>>>>> On Tue, Aug 7, 2018 at 1:19 AM, James Cheng <
> > > > > >     > >> wushujames@gmail.com>
> > > > > >     > >>>>>> wrote:
> > > > > >     > >>>>>>>
> > > > > >     > >>>>>>>> Guozhang, in a previous message, you proposed said
> > > this:
> > > > > >     > >>>>>>>>
> > > > > >     > >>>>>>>>> On Jul 30, 2018, at 3:56 PM, Guozhang Wang <
> > > > > wangguoz@gmail.com
> > > > > >     > >>>
> > > > > >     > >>>>> wrote:
> > > > > >     > >>>>>>>>>
> > > > > >     > >>>>>>>>> 1. We bump up the JoinGroupRequest with
> additional
> > > > > fields:
> > > > > >     > >>>>>>>>>
> > > > > >     > >>>>>>>>> 1.a) a flag indicating "static" or "dynamic"
> > > membership
> > > > > >     > >>> protocols.
> > > > > >     > >>>>>>>>> 1.b) with "static" membership, we also add the
> > > > > pre-defined
> > > > > >     > >>> member
> > > > > >     > >>>>> id.
> > > > > >     > >>>>>>>>> 1.c) with "static" membership, we also add an
> > > optional
> > > > > >     > >>>>>>>>> "group-change-timeout" value.
> > > > > >     > >>>>>>>>>
> > > > > >     > >>>>>>>>> 2. On the broker side, we enforce only one of the
> > two
> > > > > protocols
> > > > > >     > >>> for
> > > > > >     > >>>>> all
> > > > > >     > >>>>>>>>> group members: we accept the protocol on the
> first
> > > > joined
> > > > > >     > >> member
> > > > > >     > >>> of
> > > > > >     > >>>>> the
> > > > > >     > >>>>>>>>> group, and if later joining members indicate a
> > > > different
> > > > > >     > >>> membership
> > > > > >     > >>>>>>>>> protocol, we reject it. If the
> group-change-timeout
> > > > > value was
> > > > > >     > >>>>> different
> > > > > >     > >>>>>>>> to
> > > > > >     > >>>>>>>>> the first joined member, we reject it as well.
> > > > > >     > >>>>>>>>
> > > > > >     > >>>>>>>>
> > > > > >     > >>>>>>>> What will happen if we have an already-deployed
> > > > > application that
> > > > > >     > >>>> wants
> > > > > >     > >>>>>> to
> > > > > >     > >>>>>>>> switch to using static membership? Let's say there
> > are
> > > > 10
> > > > > >     > >>> instances
> > > > > >     > >>>> of
> > > > > >     > >>>>>> it.
> > > > > >     > >>>>>>>> As the instances go through a rolling restart,
> they
> > > will
> > > > > switch
> > > > > >     > >>> from
> > > > > >     > >>>>>>>> dynamic membership (the default?) to static
> > > membership.
> > > > > As each
> > > > > >     > >>> one
> > > > > >     > >>>>>> leaves
> > > > > >     > >>>>>>>> the group and restarts, they will be rejected from
> > the
> > > > > group
> > > > > >     > >>>> (because
> > > > > >     > >>>>>> the
> > > > > >     > >>>>>>>> group is currently using dynamic membership). The
> > > group
> > > > > will
> > > > > >     > >>> shrink
> > > > > >     > >>>>> down
> > > > > >     > >>>>>>>> until there is 1 node handling all the traffic.
> > After
> > > > > that one
> > > > > >     > >>>>> restarts,
> > > > > >     > >>>>>>>> the group will switch over to static membership.
> > > > > >     > >>>>>>>>
> > > > > >     > >>>>>>>> Is that right? That means that the transition plan
> > > from
> > > > > dynamic
> > > > > >     > >> to
> > > > > >     > >>>>>> static
> > > > > >     > >>>>>>>> membership isn't very smooth.
> > > > > >     > >>>>>>>>
> > > > > >     > >>>>>>>> I'm not really sure what can be done in this case.
> > > This
> > > > > reminds
> > > > > >     > >> me
> > > > > >     > >>>> of
> > > > > >     > >>>>>> the
> > > > > >     > >>>>>>>> transition plans that were discussed for moving
> from
> > > > > >     > >>> zookeeper-based
> > > > > >     > >>>>>>>> consumers to kafka-coordinator-based consumers.
> That
> > > was
> > > > > also
> > > > > >     > >>> hard,
> > > > > >     > >>>>> and
> > > > > >     > >>>>>>>> ultimately we decided not to build that.
> > > > > >     > >>>>>>>>
> > > > > >     > >>>>>>>> -James
> > > > > >     > >>>>>>>>
> > > > > >     > >>>>>>>>
> > > > > >     > >>>>>>>
> > > > > >     > >>>>>>>
> > > > > >     > >>>>>>
> > > > > >     > >>>>>>
> > > > > >     > >>>>>
> > > > > >     > >>>>
> > > > > >     > >>>>
> > > > > >     > >>>>
> > > > > >     > >>>> --
> > > > > >     > >>>> -- Guozhang
> > > > > >     > >>>>
> > > > > >     > >>>
> > > > > >     > >>
> > > > > >     > >>
> > > > > >     > >>
> > > > > >     > >> --
> > > > > >     > >> -- Guozhang
> > > > > >     > >>
> > > > > >     >
> > > > > >
> > > > > >
> > > > >
> > > > >
> > > >
> > > > --
> > > > -Regards,
> > > > Mayuresh R. Gharat
> > > > (862) 250-7125
> > > >
> > >
> > >
> > > --
> > > -Regards,
> > > Mayuresh R. Gharat
> > > (862) 250-7125
> > >
> >
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message