kafka-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Onur Karaman <okara...@linkedin.com.INVALID>
Subject Re: [DISCUSS] Client-side Assignment for New Consumer
Date Fri, 28 Aug 2015 20:02:35 GMT
>From what I understand, the "largest number of partitions" trick is based
on the assumption that topics can only expand their partitions. What
happens when a topic gets deleted and recreated? This breaks that
assumption.

On Fri, Aug 28, 2015 at 6:33 AM, Neha Narkhede <neha@confluent.io> wrote:

> Thanks for re-reviewing Joel.
>
>
>
>
>
>
> On Fri, Aug 28, 2015 at 2:51 AM -0700, "Joel Koshy" <jjkoshy.w@gmail.com>
> wrote:
>
>
>
>
>
>
>
>
>
>
> > I think we think this proposal addresses 100% of the split brain issues
> > ever seen in the ZK-based protocol, but I think you think there are still
> > issues. Can you explain what your thinking of and when you think it would
> > happen? I want to make sure you aren't assuming client-side=>split-brain
> > since I think that is totally not the case.
>
> Yes I had concluded that client-side assignment would still result in
> split-brain wrt partition counts, but I overlooked a key sentence in
> the wiki - i.e., that the assignment algorithm for consumers can just
> use the largest number of partitions for each topic reported by any of
> the consumers. i.e., I assumed that consumers would just fail
> rebalance if the partition counts were inconsistent but that is not
> the case since this conflict can be easily resolved as described
> without further join-group requests. Sorry about that. There is still
> the issue of the coordinator having to send back n*m worth of
> metadata, but that was not my biggest concern. I'll look over it again
> and reply back tomorrow.
>
> Joel
>
> On Thu, Aug 27, 2015 at 2:55 PM, Jay Kreps  wrote:
> > Hey Joel,
> >
> > I really don't think we should do both. There are pros and cons but we
> > should make a decision and work on operationalizing one approach. Much of
> > really making something like this work is getting all the bugs out,
> getting
> > monitoring in place, getting rigorous system tests in place. Trying to do
> > those things twice with the same resources will just mean we do them half
> > as well. I also think this buys nothing from the user's point of
> view--they
> > want co-ordination that works correctly, the debate we are having is
> purely
> > a "how should we build that" debate. So this is really not the kind of
> > thing we'd want to make pluggable and if we did that would just
> complicate
> > life for the user.
> >
> > I think we think this proposal addresses 100% of the split brain issues
> > ever seen in the ZK-based protocol, but I think you think there are still
> > issues. Can you explain what your thinking of and when you think it would
> > happen? I want to make sure you aren't assuming client-side=>split-brain
> > since I think that is totally not the case.
> >
> > With respect to "herd issues" I actually think all the proposals address
> > this by scaling the co-ordinator out to all nodes and making the
> > co-ordination vastly cheaper. No proposal, of course, gets rid of the
> fact
> > that all clients rejoin at once when there is a membership change, but
> that
> > is kind of fundamental to the problem.
> >
> > -Jay
> >
> > On Thu, Aug 27, 2015 at 2:02 PM, Joel Koshy  wrote:
> >
> >> I actually feel these set of tests (whatever they may be) are somewhat
> >> irrelevant here. My main concern with the current client-side proposal
> >> (i.e., without Becket's follow-up suggestions) is that it makes a
> >> significant compromise to the original charter of the new consumer -
> >> i.e., reduce/eliminate herd and split brain problems in both group
> >> management and partition assignment. I understand the need for
> >> client-side partition assignment in some use cases (which we are also
> >> interested in), but I also think we should make every effort to keep
> >> full server-side coordination for the remaining (majority) of use
> >> cases especially if it does not complicate the protocol. The proposed
> >> changes do not complicate the protocol IMO - i.e., there is no further
> >> modification to the request/response formats beyond the current
> >> client-side proposal. It only involves a trivial reinterpretation of
> >> the content of the protocol metadata field.
> >>
> >> Joel
> >>
> >> On Wed, Aug 26, 2015 at 9:33 PM, Neha Narkhede  wrote:
> >> > Hey Becket,
> >> >
> >> > In that case, the broker side partition assignment would be ideal
> because
> >> >> it avoids
> >> >> issues like metadata inconsistency / split brain / exploding
> >> subscription
> >> >> set propagation.
> >> >
> >> >
> >> > As per our previous discussions regarding each of those concerns
> >> (referring
> >> > to this email thread, KIP calls and JIRA comments), we are going to
> run a
> >> > set of tests using the LinkedIn deployment numbers that we will wait
> for
> >> > you to share. The purpose is to see if those concerns are really
> valid or
> >> > not. I'd prefer to see that before making any more changes that will
> >> > complicate the protocol.
> >> >
> >> > On Wed, Aug 26, 2015 at 4:57 PM, Jiangjie Qin > >
> >> > wrote:
> >> >
> >> >> Hi folks,
> >> >>
> >> >> After further discussion in LinkedIn, we found that while having a
> more
> >> >> general group management protocol is very useful, the vast majority
> of
> >> the
> >> >> clients will not use customized partition assignment strategy. In
> that
> >> >> case, the broker side partition assignment would be ideal because it
> >> avoids
> >> >> issues like metadata inconsistency / split brain / exploding
> >> subscription
> >> >> set propagation.
> >> >>
> >> >> So we have the following proposal that satisfies the majority of the
> >> >> clients' needs without changing the currently proposed binary
> protocol.
> >> >> i.e., Continue to support broker-side assignment if the assignment
> >> strategy
> >> >> is recognized by the coordinator.
> >> >>
> >> >> 1. Keep the binary protocol as currently proposed.
> >> >>
> >> >> 2. Change the way we interpret ProtocolMetadata:
> >> >> 2.1 On consumer side, change partition.assignment.strategy to
> >> >> partition.assignor.class. Implement the something like the following
> >> >> PartitionAssignor Interface:
> >> >>
> >> >> public interface PartitionAssignor {
> >> >>   List protocolTypes();
> >> >>   byte[] protocolMetadata();
> >> >>   // return the Topic->List map that are assigned to this
> >> >> consumer.
> >> >>   List assignPartitions(String protocolType, byte[]
> >> >> responseProtocolMetadata);
> >> >> }
> >> >>
> >> >> public abstract class AbstractPartitionAssignor implements
> >> >> PartitionAssignor {
> >> >>   protected final KafkaConsumer consumer;
> >> >>   AbstractPartitionAssignor(KafkaConsumer consumer) {
> >> >>     this.consumer = consumer;
> >> >>   }
> >> >> }
> >> >>
> >> >> 2.2 The ProtocolMetadata in JoinGroupRequest will be
> >> >> partitionAssignor.protocolMetadata(). When partition.assignor.class
> is
> >> >> "range" or "roundrobin", the ProtocolMetadata in JoinGroupRequest
> will
> >> be a
> >> >> JSON subscription set. ("range", "roundrobin" will be reserved
> words, we
> >> >> can also consider reserving some Prefix such as "broker-" to be more
> >> clear)
> >> >> 2.3 On broker side when ProtocolType is "range" or "roundroubin",
> >> >> coordinator will parse the ProtocolMetadata in the JoinGroupRequest
> and
> >> >> assign the partitions for consumers. In the JoinGroupResponse, the
> >> >> ProtocolMetadata will be the global assignment of partitions.
> >> >> 2.4 On client side, after receiving the JoinGroupResponse,
> >> >> partitionAssignor.assignPartitions() will be invoked to return the
> >> actual
> >> >> assignment. If the assignor is RangeAssignor or RoundRobinAssignor,
> they
> >> >> will parse the assignment from the ProtocolMetadata returned by
> >> >> coordinator.
> >> >>
> >> >> This approach has a few merits:
> >> >> 1. Does not change the proposed binary protocol, which is still
> general.
> >> >> 2. The majority of the consumers will not suffer from inconsistent
> >> metadata
> >> >> / split brain / exploding subscription set propagation. This is
> >> >> specifically to deal with the issue that the current proposal caters
> to
> >> a
> >> >> 20% use-case while adversely impacting the more common 80% use-cases.
> >> >> 3. Easy to implement. The only thing needed is implement a
> partitioner
> >> >> class. For most users, the default range and roundrobin partitioner
> are
> >> >> good enough.
> >> >>
> >> >> Thoughts?
> >> >>
> >> >> Thanks,
> >> >>
> >> >> Jiangjie (Becket) Qin
> >> >>
> >> >> On Tue, Aug 18, 2015 at 2:51 PM, Jason Gustafson
> >> >> wrote:
> >> >>
> >> >> > Follow-up from the kip call:
> >> >> >
> >> >> > 1. Onur brought up the question of whether this protocol provides
> >> enough
> >> >> > coordination capabilities to be generally useful in practice (is
> that
> >> >> > accurate, Onur?). If it doesn't, then each use case would probably
> >> need a
> >> >> > dependence on zookeeper anyway, and we haven't really gained
> anything.
> >> >> The
> >> >> > group membership provided by this protocol is a useful primitive
> for
> >> >> > coordination, but it's limited in the sense that everything shared
> >> among
> >> >> > the group has to be communicated at the time the group is created.
> If
> >> any
> >> >> > shared data changes, then the only way the group can ensure
> agreement
> >> is
> >> >> to
> >> >> > force a rebalance. This is expensive since all members must stall
> >> while
> >> >> the
> >> >> > rebalancing takes place. As we have also seen, there is a practical
> >> limit
> >> >> > on the amount of metadata that can be sent through this protocol
> when
> >> >> > groups get a little larger. This protocol is therefore not
> suitable to
> >> >> > cases which require frequent communication or which require a
large
> >> >> amount
> >> >> > of data to be communicated. For the use cases listed on the wiki,
> >> neither
> >> >> > of these appear to be an issue, but there may be other limitations
> >> which
> >> >> > would limit reuse of the protocol. Perhaps it would be sufficient
> to
> >> >> sketch
> >> >> > how these cases might work?
> >> >> >
> >> >> > 2. We talked a little bit about the issue of metadata churn. Becket
> >> >> brought
> >> >> > up the interesting point that not only do we depend on topic
> metadata
> >> >> > changing relatively infrequently, but we also expect timely
> agreement
> >> >> among
> >> >> > the brokers on what that metadata is. To resolve this, we can
have
> the
> >> >> > consumers fetch metadata from the coordinator. We still depend
on
> >> topic
> >> >> > metadata not changing frequently, but this should resolve any
> >> >> disagreement
> >> >> > among the brokers themselves. In fact, since we expect that
> >> disagreement
> >> >> is
> >> >> > relatively rare, we can have the consumers fetch from the
> coordinator
> >> >> only
> >> >> > when when a disagreement occurs. The nice thing about this
> proposal is
> >> >> that
> >> >> > it doesn't affect the join group semantics, so the coordinator
> would
> >> >> remain
> >> >> > oblivious to the metadata used by the group for agreement. Also,
if
> >> >> > metadata churn becomes an issue, it might be possible to have
the
> >> >> > coordinator provide a snapshot for the group to ensure that a
> >> generation
> >> >> > would be able to reach agreement (this would probably require
> adding
> >> >> > groupId/generation to the metadata request).
> >> >> >
> >> >> > 3. We talked briefly about support for multiple protocols in the
> join
> >> >> group
> >> >> > request in order to allow changing the assignment strategy without
> >> >> > downtime. I think it's a little doubtful that this would get much
> use
> >> in
> >> >> > practice, but I agree it's a nice option to have on the table.
An
> >> >> > alternative, for the sake of argument, is to have each member
> provide
> >> >> only
> >> >> > one version of the protocol, and to let the coordinator choose
the
> >> >> protocol
> >> >> > with the largest number of supporters. All members which can't
> support
> >> >> the
> >> >> > selected protocol would be kicked out of the group. The drawback
> in a
> >> >> > rolling upgrade is that the total capacity of the group would
be
> >> >> > momentarily halved. It would also be a little tricky to handle
the
> >> case
> >> >> of
> >> >> > retrying when a consumer is kicked out of the group. We wouldn't
> want
> >> it
> >> >> to
> >> >> > be able to effect a rebalance, for example, if it would just be
> kicked
> >> >> out
> >> >> > again. That would probably complicate the group management logic
on
> >> the
> >> >> > coordinator.
> >> >> >
> >> >> >
> >> >> > Thanks,
> >> >> > Jason
> >> >> >
> >> >> >
> >> >> > On Tue, Aug 18, 2015 at 11:16 AM, Jiangjie Qin
> >> > >> >
> >> >> > wrote:
> >> >> >
> >> >> > > Jun,
> >> >> > >
> >> >> > > Yes, I agree. If the metadata can be synced quickly there
should
> >> not be
> >> >> > an
> >> >> > > issue. It just occurred to me that there is a proposal to
allow
> >> >> consuming
> >> >> > > from followers in ISR, that could potentially cause more
frequent
> >> >> > metadata
> >> >> > > change for consumers. Would that be an issue?
> >> >> > >
> >> >> > > Thanks,
> >> >> > >
> >> >> > > Jiangjie (Becket) Qin
> >> >> > >
> >> >> > > On Tue, Aug 18, 2015 at 10:22 AM, Jason Gustafson <
> >> jason@confluent.io>
> >> >> > > wrote:
> >> >> > >
> >> >> > > > Hi Jun,
> >> >> > > >
> >> >> > > > Answers below:
> >> >> > > >
> >> >> > > > 1. When there are multiple common protocols in the
> >> JoinGroupRequest,
> >> >> > > which
> >> >> > > > one would the coordinator pick?
> >> >> > > >
> >> >> > > > I was intending to use the list to indicate preference.
If all
> >> group
> >> >> > > > members support protocols ["A", "B"] in that order,
then we
> will
> >> >> choose
> >> >> > > > "A." If some support ["B", "A"], then we would either
choose
> >> based on
> >> >> > > > respective counts or just randomly. The main use case
of
> >> supporting
> >> >> the
> >> >> > > > list is for rolling upgrades when a change is made to
the
> >> assignment
> >> >> > > > strategy. In that case, the new assignment strategy
would be
> >> listed
> >> >> > first
> >> >> > > > in the upgraded client. I think it's debatable whether
this
> >> feature
> >> >> > would
> >> >> > > > get much use in practice, so we might consider dropping
it.
> >> >> > > >
> >> >> > > > 2. If the protocols don't agree, the group construction
fails.
> >> What
> >> >> > > exactly
> >> >> > > > does it mean? Do we send an error in every JoinGroupResponse
> and
> >> >> remove
> >> >> > > all
> >> >> > > > members in the group in the coordinator?
> >> >> > > >
> >> >> > > > Yes, that is right. It would be handled similarly to
> inconsistent
> >> >> > > > assignment strategies in the current protocol. The coordinator
> >> >> returns
> >> >> > an
> >> >> > > > error in each join group response, and the client propagates
> the
> >> >> error
> >> >> > to
> >> >> > > > the user.
> >> >> > > >
> >> >> > > > 3. Consumer embedded protocol: The proposal has two
different
> >> formats
> >> >> > of
> >> >> > > > subscription depending on whether wildcards are used
or not.
> This
> >> >> > seems a
> >> >> > > > bit complicated. Would it be better to always use the
metadata
> >> hash?
> >> >> > The
> >> >> > > > clients know the subscribed topics already. This way,
the
> client
> >> code
> >> >> > > > behaves the same whether wildcards are used or not.
> >> >> > > >
> >> >> > > > Yeah, I think this is possible (Neha also suggested
it). I
> haven't
> >> >> > > updated
> >> >> > > > the wiki yet, but the patch I started working on uses
only the
> >> >> metadata
> >> >> > > > hash. In the case that an explicit topic list is provided,
the
> >> hash
> >> >> > just
> >> >> > > > covers the metadata for those topics.
> >> >> > > >
> >> >> > > >
> >> >> > > > Thanks,
> >> >> > > > Jason
> >> >> > > >
> >> >> > > >
> >> >> > > >
> >> >> > > > On Tue, Aug 18, 2015 at 10:06 AM, Jun Rao
> >> wrote:
> >> >> > > >
> >> >> > > > > Jason,
> >> >> > > > >
> >> >> > > > > Thanks for the writeup. A few comments below.
> >> >> > > > >
> >> >> > > > > 1. When there are multiple common protocols in
the
> >> >> JoinGroupRequest,
> >> >> > > > which
> >> >> > > > > one would the coordinator pick?
> >> >> > > > > 2. If the protocols don't agree, the group construction
> fails.
> >> What
> >> >> > > > exactly
> >> >> > > > > does it mean? Do we send an error in every JoinGroupResponse
> and
> >> >> > remove
> >> >> > > > all
> >> >> > > > > members in the group in the coordinator?
> >> >> > > > > 3. Consumer embedded protocol: The proposal has
two different
> >> >> formats
> >> >> > > of
> >> >> > > > > subscription depending on whether wildcards are
used or not.
> >> This
> >> >> > > seems a
> >> >> > > > > bit complicated. Would it be better to always use
the
> metadata
> >> >> hash?
> >> >> > > The
> >> >> > > > > clients know the subscribed topics already. This
way, the
> client
> >> >> code
> >> >> > > > > behaves the same whether wildcards are used or
not.
> >> >> > > > >
> >> >> > > > > Jiangjie,
> >> >> > > > >
> >> >> > > > > With respect to rebalance churns due to topics
being
> >> >> created/deleted.
> >> >> > > > With
> >> >> > > > > the new consumer, the rebalance can probably settle
within
> 200ms
> >> >> when
> >> >> > > > there
> >> >> > > > > is a topic change. So, as long as we are not changing
topic
> more
> >> >> > than 5
> >> >> > > > > times per sec, there shouldn't be constant churns,
right?
> >> >> > > > >
> >> >> > > > > Thanks,
> >> >> > > > >
> >> >> > > > > Jun
> >> >> > > > >
> >> >> > > > >
> >> >> > > > >
> >> >> > > > > On Tue, Aug 11, 2015 at 1:19 PM, Jason Gustafson
<
> >> >> jason@confluent.io
> >> >> > >
> >> >> > > > > wrote:
> >> >> > > > >
> >> >> > > > > > Hi Kafka Devs,
> >> >> > > > > >
> >> >> > > > > > One of the nagging issues in the current design
of the new
> >> >> consumer
> >> >> > > has
> >> >> > > > > > been the need to support a variety of assignment
> strategies.
> >> >> We've
> >> >> > > > > > encountered this in particular in the design
of copycat and
> >> the
> >> >> > > > > processing
> >> >> > > > > > framework (KIP-28). From what I understand,
Samza also has
> a
> >> >> number
> >> >> > > of
> >> >> > > > > use
> >> >> > > > > > cases with custom assignment needs. The new
consumer
> protocol
> >> >> > > supports
> >> >> > > > > new
> >> >> > > > > > assignment strategies by hooking them into
the broker. For
> >> many
> >> >> > > > > > environments, this is a major pain and in
some cases, a
> >> >> > non-starter.
> >> >> > > It
> >> >> > > > > > also challenges the validation that the coordinator
can
> >> provide.
> >> >> > For
> >> >> > > > > > example, some assignment strategies call for
partitions to
> be
> >> >> > > assigned
> >> >> > > > > > multiple times, which means that the coordinator
can only
> >> check
> >> >> > that
> >> >> > > > > > partitions have been assigned at least once.
> >> >> > > > > >
> >> >> > > > > > To solve these issues, we'd like to propose
moving
> assignment
> >> to
> >> >> > the
> >> >> > > > > > client. I've written a wiki which outlines
some protocol
> >> changes
> >> >> to
> >> >> > > > > achieve
> >> >> > > > > > this:
> >> >> > > > > >
> >> >> > > > > >
> >> >> > > > >
> >> >> > > >
> >> >> > >
> >> >> >
> >> >>
> >>
> https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Client-side+Assignment+Proposal
> >> >> > > > > > .
> >> >> > > > > > To summarize briefly, instead of the coordinator
assigning
> the
> >> >> > > > partitions
> >> >> > > > > > itself, all subscriptions are forwarded to
each member of
> the
> >> >> group
> >> >> > > > which
> >> >> > > > > > then decides independently which partitions
it should
> consume.
> >> >> The
> >> >> > > > > protocol
> >> >> > > > > > provides a mechanism for the coordinator to
validate that
> all
> >> >> > > consumers
> >> >> > > > > use
> >> >> > > > > > the same assignment strategy, but it does
not ensure that
> the
> >> >> > > resulting
> >> >> > > > > > assignment is "correct." This provides a powerful
> capability
> >> for
> >> >> > > users
> >> >> > > > to
> >> >> > > > > > control the full data flow on the client side.
They control
> >> how
> >> >> > data
> >> >> > > is
> >> >> > > > > > written to partitions through the Partitioner
interface and
> >> they
> >> >> > > > control
> >> >> > > > > > how data is consumed through the assignment
strategy, all
> >> without
> >> >> > > > > touching
> >> >> > > > > > the server.
> >> >> > > > > >
> >> >> > > > > > Of course nothing comes for free. In particular,
this
> change
> >> >> > removes
> >> >> > > > the
> >> >> > > > > > ability of the coordinator to validate that
commits are
> made
> >> by
> >> >> > > > consumers
> >> >> > > > > > who were assigned the respective partition.
This might not
> be
> >> too
> >> >> > bad
> >> >> > > > > since
> >> >> > > > > > we retain the ability to validate the generation
id, but it
> >> is a
> >> >> > > > > potential
> >> >> > > > > > concern. We have considered alternative protocols
which
> add a
> >> >> > second
> >> >> > > > > > round-trip to the protocol in order to give
the coordinator
> >> the
> >> >> > > ability
> >> >> > > > > to
> >> >> > > > > > confirm the assignment. As mentioned above,
the
> coordinator is
> >> >> > > somewhat
> >> >> > > > > > limited in what it can actually validate,
but this would
> >> return
> >> >> its
> >> >> > > > > ability
> >> >> > > > > > to validate commits. The tradeoff is that
it increases the
> >> >> > protocol's
> >> >> > > > > > complexity which means more ways for the protocol
to fail
> and
> >> >> > > > > consequently
> >> >> > > > > > more edge cases in the code.
> >> >> > > > > >
> >> >> > > > > > It also misses an opportunity to generalize
the group
> >> membership
> >> >> > > > protocol
> >> >> > > > > > for additional use cases. In fact, after you've
gone to the
> >> >> trouble
> >> >> > > of
> >> >> > > > > > moving assignment to the client, the main
thing that is
> left
> >> in
> >> >> > this
> >> >> > > > > > protocol is basically a general group management
> capability.
> >> This
> >> >> > is
> >> >> > > > > > exactly what is needed for a few cases that
are currently
> >> under
> >> >> > > > > discussion
> >> >> > > > > > (e.g. copycat or single-writer producer).
We've taken this
> >> >> further
> >> >> > > step
> >> >> > > > > in
> >> >> > > > > > the proposal and attempted to envision what
that general
> >> protocol
> >> >> > > might
> >> >> > > > > > look like and how it could be used both by
the consumer and
> >> for
> >> >> > some
> >> >> > > of
> >> >> > > > > > these other cases.
> >> >> > > > > >
> >> >> > > > > > Anyway, since time is running out on the new
consumer, we
> have
> >> >> > > perhaps
> >> >> > > > > one
> >> >> > > > > > last chance to consider a significant change
in the
> protocol
> >> like
> >> >> > > this,
> >> >> > > > > so
> >> >> > > > > > have a look at the wiki and share your thoughts.
I've no
> doubt
> >> >> that
> >> >> > > > some
> >> >> > > > > > ideas seem clearer in my mind than they do
on paper, so ask
> >> >> > questions
> >> >> > > > if
> >> >> > > > > > there is any confusion.
> >> >> > > > > >
> >> >> > > > > > Thanks!
> >> >> > > > > > Jason
> >> >> > > > > >
> >> >> > > > >
> >> >> > > >
> >> >> > >
> >> >> >
> >> >>
> >> >
> >> >
> >> >
> >> > --
> >> > Thanks,
> >> > Neha
> >>
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message