kafka-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Guozhang Wang <wangg...@gmail.com>
Subject Re: [DISCUSS] KIP-345: Reduce multiple consumer rebalances by specifying member id
Date Fri, 24 Aug 2018 18:44:59 GMT
Hey Jason,

I like your idea to simplify the upgrade protocol to allow co-exist of
static and dynamic members. Admittedly it may make the coordinator-side
logic a bit more complex, but I think it worth doing it.

Originally I was trying to kill more birds with one stone with KIP-345,
e.g. to fix the multi-rebalance issue on starting up / shutting down a
multi-instance client (mentioned as case 1)/2) in my early email), and
hence proposing to have a pure static-membership protocol. But thinking
twice about it I now feel it may be too ambitious and worth fixing in
another KIP. With that, I think what you've proposed here is a good way to
go for KIP-345 itself.

Note there are a few details in your proposal we'd still need to figure out:

1. How this longish static member expiration timeout defined? Is it via a
broker, hence global config, or via a client config which can be
communicated to broker via JoinGroupRequest?

2. Assuming that for static members, LEAVE_GROUP request will not trigger a
rebalance immediately either, similar to session timeout, but only the
longer member expiration timeout, can we remove the internal "
internal.leave.group.on.close" config, which is a quick walk-around then?



Guozhang


On Fri, Aug 24, 2018 at 11:14 AM, Jason Gustafson <jason@confluent.io>
wrote:

> Hey All,
>
> Nice to see some solid progress on this. It sounds like one of the
> complications is allowing static and dynamic registration to coexist. I'm
> wondering if we can do something like the following:
>
> 1. Statically registered members (those joining the group with a non-null `
> member.name`) maintain a session with the coordinator just like dynamic
> members.
> 2. If a session is active for a static member when a rebalance begins, then
> basically we'll keep the current behavior. The rebalance will await the
> static member joining the group.
> 3. If a static member does not have an active session, then the coordinator
> will not wait for it to join, but will still include it in the rebalance.
> The coordinator will forward the cached subscription information to the
> leader and will cache the assignment after the rebalance completes. (Note
> that we still have the generationId to fence offset commits from a static
> zombie if the assignment changes.)
> 4. When a static member leaves the group or has its session expire, no
> rebalance is triggered. Instead, we can begin a timer to expire the static
> registration. This would be a longish timeout (like 30 minutes say).
>
> So basically static members participate in all rebalances regardless
> whether they have an active session. In a given rebalance, some of the
> members may be static and some dynamic. The group leader can differentiate
> the two based on the presence of the `member.name` (we have to add this to
> the JoinGroupResponse). Generally speaking, we would choose leaders
> preferentially from the active members that support the latest JoinGroup
> protocol and are using static membership. If we have to choose a leader
> with an old version, however, it would see all members in the group (static
> or dynamic) as dynamic members and perform the assignment as usual.
>
> Would that work?
>
> -Jason
>
>
> On Thu, Aug 23, 2018 at 5:26 PM, Guozhang Wang <wangguoz@gmail.com> wrote:
>
> > Hello Boyang,
> >
> > Thanks for the updated proposal, a few questions:
> >
> > 1. Where will "change-group-timeout" be communicated to the broker? Will
> > that be a new field in the JoinGroupRequest, or are we going to
> piggy-back
> > on the existing session-timeout field (assuming that the original value
> > will not be used anywhere in the static membership any more)?
> >
> > 2. "However, if the consumer takes longer than session timeout to return,
> > we shall still trigger rebalance but it could still try to catch
> > `change-group-timeout`.": what does this mean? I thought your proposal is
> > that for static memberships, the broker will NOT trigger rebalance even
> > after session-timeout has been detected, but only that after
> > change-group-timeout
> > which is supposed to be longer than session-timeout to be defined?
> >
> > 3. "A join group request with member.name set will be treated as
> > `static-membership` strategy", in this case, how would the switch from
> > dynamic to static happen, since whoever changed the member.name to
> > not-null
> > will be rejected, right?
> >
> > 4. "just erase the cached mapping, and wait for session timeout to
> trigger
> > rebalance should be sufficient." this is also a bit unclear to me: who
> will
> > erase the cached mapping? Since it is on the broker-side I assume that
> > broker has to do it. Are you suggesting to use a new request for it?
> >
> > 5. "Halfway switch": following 3) above, if your proposal is basically to
> > let "first join-request wins", and the strategy will stay as is until all
> > members are gone, then this will also not happen since whoever used
> > different strategy as the first guy who sends join-group request will be
> > rejected right?
> >
> >
> > Guozhang
> >
> >
> > On Tue, Aug 21, 2018 at 9:28 AM, John Roesler <john@confluent.io> wrote:
> >
> > > This sounds good to me!
> > >
> > > Thanks for the time you've spent on it,
> > > -John
> > >
> > > On Tue, Aug 21, 2018 at 12:13 AM Boyang Chen <bchen11@outlook.com>
> > wrote:
> > >
> > > > Thanks Matthias for the input. Sorry I was busy recently and haven't
> > got
> > > > time to update this thread. To summarize what we come up so far, here
> > is
> > > a
> > > > draft updated plan:
> > > >
> > > >
> > > > Introduce a new config called `member.name` which is supposed to be
> > > > provided uniquely by the consumer client. The broker will maintain a
> > > cache
> > > > with [key:member.name, value:member.id]. A join group request with
> > > > member.name set will be treated as `static-membership` strategy, and
> > > will
> > > > reject any join group request without member.name. So this
> > coordination
> > > > change will be differentiated from the `dynamic-membership` protocol
> we
> > > > currently have.
> > > >
> > > >
> > > > When handling static join group request:
> > > >
> > > >   1.   The broker will check the membership to see whether this is a
> > new
> > > > member. If new, broker allocate a unique member id, cache the mapping
> > and
> > > > move to rebalance stage.
> > > >   2.   Following 1, if this is an existing member, broker will not
> > change
> > > > group state, and return its cached member.id and current assignment.
> > > > (unless this is leader, we shall trigger rebalance)
> > > >   3.   Although Guozhang has mentioned we could rejoin with pair
> member
> > > > name and id, I think for join group request it is ok to leave member
> id
> > > > blank as member name is the unique identifier. In commit offset
> request
> > > we
> > > > *must* have both.
> > > >
> > > >
> > > > When handling commit offset request, if enabled with static
> membership,
> > > > each time the commit request must have both member.name and
> member.id
> > to
> > > > be identified as a `certificated member`. If not, this means there
> are
> > > > duplicate consumer members with same member name and the request will
> > be
> > > > rejected to guarantee consumption uniqueness.
> > > >
> > > >
> > > > When rolling restart/shutting down gracefully, the client will send a
> > > > leave group request (static membership mode). In static membership,
> we
> > > will
> > > > also define `change-group-timeout` to hold on rebalance provided by
> > > leader.
> > > > So we will wait for all the members to rejoin the group and do
> exactly
> > > one
> > > > rebalance since all members are expected to rejoin within timeout. If
> > > > consumer crashes, the join group request from the restarted consumer
> > will
> > > > be recognized as an existing member and be handled as above condition
> > 1;
> > > > However, if the consumer takes longer than session timeout to return,
> > we
> > > > shall still trigger rebalance but it could still try to catch
> > > > `change-group-timeout`. If it failed to catch second timeout, its
> > cached
> > > > state on broker will be garbage collected and trigger a new rebalance
> > > when
> > > > it finally joins.
> > > >
> > > >
> > > > And consider the switch between dynamic to static membership.
> > > >
> > > >   1.  Dynamic to static: the first joiner shall revise the membership
> > to
> > > > static and wait for all the current members to restart, since their
> > > > membership is still dynamic. Here our assumption is that the restart
> > > > process shouldn't take a long time, as long restart is breaking the
> > > > `rebalance timeout` in whatever membership protocol we are using.
> > Before
> > > > restart, all dynamic member join requests will be rejected.
> > > >   2.  Static to dynamic: this is more like a downgrade which should
> be
> > > > smooth: just erase the cached mapping, and wait for session timeout
> to
> > > > trigger rebalance should be sufficient. (Fallback to current
> behavior)
> > > >   3.  Halfway switch: a corner case is like some clients keep dynamic
> > > > membership while some keep static membership. This will cause the
> group
> > > > rebalance forever without progress because dynamic/static states are
> > > > bouncing each other. This could guarantee that we will not make the
> > > > consumer group work in a wrong state by having half static and half
> > > dynamic.
> > > >
> > > > To guarantee correctness, we will also push the member name/id pair
> to
> > > > _consumed_offsets topic (as Matthias pointed out) and upgrade the API
> > > > version, these details will be further discussed back in the KIP.
> > > >
> > > >
> > > > Are there any concern for this high level proposal? Just want to
> > > reiterate
> > > > on the core idea of the KIP: "If the broker recognize this consumer
> as
> > an
> > > > existing member, it shouldn't trigger rebalance".
> > > >
> > > > Thanks a lot for everyone's input! I feel this proposal is much more
> > > > robust than previous one!
> > > >
> > > >
> > > > Best,
> > > >
> > > > Boyang
> > > >
> > > > ________________________________
> > > > From: Matthias J. Sax <matthias@confluent.io>
> > > > Sent: Friday, August 10, 2018 2:24 AM
> > > > To: dev@kafka.apache.org
> > > > Subject: Re: [DISCUSS] KIP-345: Reduce multiple consumer rebalances
> by
> > > > specifying member id
> > > >
> > > > Hi,
> > > >
> > > > thanks for the detailed discussion. I learned a lot about internals
> > again
> > > > :)
> > > >
> > > > I like the idea or a user config `member.name` and to keep `
> member.id`
> > > > internal. Also agree with Guozhang, that reusing `client.id` might
> not
> > > > be a good idea.
> > > >
> > > > To clarify the algorithm, each time we generate a new `member.id`,
> we
> > > > also need to update the "group membership" information (ie, mapping
> > > > [member.id, Assignment]), right? Ie, the new `member.id` replaces
> the
> > > > old entry in the cache.
> > > >
> > > > I also think, we need to preserve the `member.name -> member.id`
> > mapping
> > > > in the `__consumer_offset` topic. The KIP should mention this IMHO.
> > > >
> > > > For changing the default value of config `leave.group.on.close`. I
> > agree
> > > > with John, that we should not change the default config, because it
> > > > would impact all consumer groups with dynamic assignment. However, I
> > > > think we can document, that if static assignment is used (ie,
> > > > `member.name` is configured) we never send a LeaveGroupRequest
> > > > regardless of the config. Note, that the config is internal, so not
> > sure
> > > > how to document this in detail. We should not expose the internal
> > config
> > > > in the docs.
> > > >
> > > > About upgrading: why do we need have two rolling bounces and encode
> > > > "static" vs "dynamic" in the JoinGroupRequest?
> > > >
> > > > If we upgrade an existing consumer group from dynamic to static, I
> > don't
> > > > see any reason why both should not work together and single rolling
> > > > bounce would not be sufficient? If we bounce the first consumer and
> > > > switch from dynamic to static, it sends a `member.name` and the
> broker
> > > > registers the [member.name, member.id] in the cache. Why would this
> > > > interfere with all other consumer that use dynamic assignment?
> > > >
> > > > Also, Guozhang mentioned that for all other request, we need to check
> > if
> > > > the mapping [member.name, member.id] contains the send `member.id`
> --
> > I
> > > > don't think this is necessary -- it seems to be sufficient to check
> the
> > > > `member.id` from the [member.id, Assignment] mapping as be do today
> --
> > > > thus, checking `member.id` does not require any change IMHO.
> > > >
> > > >
> > > > -Matthias
> > > >
> > > >
> > > > On 8/7/18 7:13 PM, Guozhang Wang wrote:
> > > > > @James
> > > > >
> > > > > What you described is true: the transition from dynamic to static
> > > > > memberships are not thought through yet. But I do not think it is
> an
> > > > > impossible problem: note that we indeed moved the offset commit
> from
> > ZK
> > > > to
> > > > > kafka coordinator in 0.8.2 :) The migration plan is to first to
> > > > > double-commits on both zk and coordinator, and then do a second
> round
> > > to
> > > > > turn the zk off.
> > > > >
> > > > > So just to throw a wild idea here: also following a
> > two-rolling-bounce
> > > > > manner, in the JoinGroupRequest we can set the flag to "static"
> while
> > > > keep
> > > > > the registry-id field empty still, in this case, the coordinator
> > still
> > > > > follows the logic of "dynamic", accepting the request while
> allowing
> > > the
> > > > > protocol to be set to "static"; after the first rolling bounce, the
> > > group
> > > > > protocol is already "static", then a second rolling bounce is
> > triggered
> > > > and
> > > > > this time we set the registry-id.
> > > > >
> > > > >
> > > > > Guozhang
> > > > >
> > > > > On Tue, Aug 7, 2018 at 1:19 AM, James Cheng <wushujames@gmail.com>
> > > > wrote:
> > > > >
> > > > >> Guozhang, in a previous message, you proposed said this:
> > > > >>
> > > > >>> On Jul 30, 2018, at 3:56 PM, Guozhang Wang <wangguoz@gmail.com>
> > > wrote:
> > > > >>>
> > > > >>> 1. We bump up the JoinGroupRequest with additional fields:
> > > > >>>
> > > > >>>  1.a) a flag indicating "static" or "dynamic" membership
> protocols.
> > > > >>>  1.b) with "static" membership, we also add the pre-defined
> member
> > > id.
> > > > >>>  1.c) with "static" membership, we also add an optional
> > > > >>> "group-change-timeout" value.
> > > > >>>
> > > > >>> 2. On the broker side, we enforce only one of the two protocols
> for
> > > all
> > > > >>> group members: we accept the protocol on the first joined
member
> of
> > > the
> > > > >>> group, and if later joining members indicate a different
> membership
> > > > >>> protocol, we reject it. If the group-change-timeout value
was
> > > different
> > > > >> to
> > > > >>> the first joined member, we reject it as well.
> > > > >>
> > > > >>
> > > > >> What will happen if we have an already-deployed application that
> > wants
> > > > to
> > > > >> switch to using static membership? Let’s say there are 10
> instances
> > of
> > > > it.
> > > > >> As the instances go through a rolling restart, they will switch
> from
> > > > >> dynamic membership (the default?) to static membership. As each
> one
> > > > leaves
> > > > >> the group and restarts, they will be rejected from the group
> > (because
> > > > the
> > > > >> group is currently using dynamic membership). The group will
> shrink
> > > down
> > > > >> until there is 1 node handling all the traffic. After that one
> > > restarts,
> > > > >> the group will switch over to static membership.
> > > > >>
> > > > >> Is that right? That means that the transition plan from dynamic
to
> > > > static
> > > > >> membership isn’t very smooth.
> > > > >>
> > > > >> I’m not really sure what can be done in this case. This reminds
me
> > of
> > > > the
> > > > >> transition plans that were discussed for moving from
> zookeeper-based
> > > > >> consumers to kafka-coordinator-based consumers. That was also
> hard,
> > > and
> > > > >> ultimately we decided not to build that.
> > > > >>
> > > > >> -James
> > > > >>
> > > > >>
> > > > >
> > > > >
> > > >
> > > >
> > >
> >
> >
> >
> > --
> > -- Guozhang
> >
>



-- 
-- Guozhang

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message