kafka-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Joel Koshy <jjkosh...@gmail.com>
Subject Re: [DISCUSS] Client-side Assignment for New Consumer
Date Thu, 27 Aug 2015 21:02:18 GMT
I actually feel these set of tests (whatever they may be) are somewhat
irrelevant here. My main concern with the current client-side proposal
(i.e., without Becket's follow-up suggestions) is that it makes a
significant compromise to the original charter of the new consumer -
i.e., reduce/eliminate herd and split brain problems in both group
management and partition assignment. I understand the need for
client-side partition assignment in some use cases (which we are also
interested in), but I also think we should make every effort to keep
full server-side coordination for the remaining (majority) of use
cases especially if it does not complicate the protocol. The proposed
changes do not complicate the protocol IMO - i.e., there is no further
modification to the request/response formats beyond the current
client-side proposal. It only involves a trivial reinterpretation of
the content of the protocol metadata field.

Joel

On Wed, Aug 26, 2015 at 9:33 PM, Neha Narkhede <neha@confluent.io> wrote:
> Hey Becket,
>
> In that case, the broker side partition assignment would be ideal because
>> it avoids
>> issues like metadata inconsistency / split brain / exploding subscription
>> set propagation.
>
>
> As per our previous discussions regarding each of those concerns (referring
> to this email thread, KIP calls and JIRA comments), we are going to run a
> set of tests using the LinkedIn deployment numbers that we will wait for
> you to share. The purpose is to see if those concerns are really valid or
> not. I'd prefer to see that before making any more changes that will
> complicate the protocol.
>
> On Wed, Aug 26, 2015 at 4:57 PM, Jiangjie Qin <jqin@linkedin.com.invalid>
> wrote:
>
>> Hi folks,
>>
>> After further discussion in LinkedIn, we found that while having a more
>> general group management protocol is very useful, the vast majority of the
>> clients will not use customized partition assignment strategy. In that
>> case, the broker side partition assignment would be ideal because it avoids
>> issues like metadata inconsistency / split brain / exploding subscription
>> set propagation.
>>
>> So we have the following proposal that satisfies the majority of the
>> clients' needs without changing the currently proposed binary protocol.
>> i.e., Continue to support broker-side assignment if the assignment strategy
>> is recognized by the coordinator.
>>
>> 1. Keep the binary protocol as currently proposed.
>>
>> 2. Change the way we interpret ProtocolMetadata:
>> 2.1 On consumer side, change partition.assignment.strategy to
>> partition.assignor.class. Implement the something like the following
>> PartitionAssignor Interface:
>>
>> public interface PartitionAssignor {
>>   List<String> protocolTypes();
>>   byte[] protocolMetadata();
>>   // return the Topic->List<Partition> map that are assigned to this
>> consumer.
>>   List<TopicPartition> assignPartitions(String protocolType, byte[]
>> responseProtocolMetadata);
>> }
>>
>> public abstract class AbstractPartitionAssignor implements
>> PartitionAssignor {
>>   protected final KafkaConsumer consumer;
>>   AbstractPartitionAssignor(KafkaConsumer consumer) {
>>     this.consumer = consumer;
>>   }
>> }
>>
>> 2.2 The ProtocolMetadata in JoinGroupRequest will be
>> partitionAssignor.protocolMetadata(). When partition.assignor.class is
>> "range" or "roundrobin", the ProtocolMetadata in JoinGroupRequest will be a
>> JSON subscription set. ("range", "roundrobin" will be reserved words, we
>> can also consider reserving some Prefix such as "broker-" to be more clear)
>> 2.3 On broker side when ProtocolType is "range" or "roundroubin",
>> coordinator will parse the ProtocolMetadata in the JoinGroupRequest and
>> assign the partitions for consumers. In the JoinGroupResponse, the
>> ProtocolMetadata will be the global assignment of partitions.
>> 2.4 On client side, after receiving the JoinGroupResponse,
>> partitionAssignor.assignPartitions() will be invoked to return the actual
>> assignment. If the assignor is RangeAssignor or RoundRobinAssignor, they
>> will parse the assignment from the ProtocolMetadata returned by
>> coordinator.
>>
>> This approach has a few merits:
>> 1. Does not change the proposed binary protocol, which is still general.
>> 2. The majority of the consumers will not suffer from inconsistent metadata
>> / split brain / exploding subscription set propagation. This is
>> specifically to deal with the issue that the current proposal caters to a
>> 20% use-case while adversely impacting the more common 80% use-cases.
>> 3. Easy to implement. The only thing needed is implement a partitioner
>> class. For most users, the default range and roundrobin partitioner are
>> good enough.
>>
>> Thoughts?
>>
>> Thanks,
>>
>> Jiangjie (Becket) Qin
>>
>> On Tue, Aug 18, 2015 at 2:51 PM, Jason Gustafson <jason@confluent.io>
>> wrote:
>>
>> > Follow-up from the kip call:
>> >
>> > 1. Onur brought up the question of whether this protocol provides enough
>> > coordination capabilities to be generally useful in practice (is that
>> > accurate, Onur?). If it doesn't, then each use case would probably need a
>> > dependence on zookeeper anyway, and we haven't really gained anything.
>> The
>> > group membership provided by this protocol is a useful primitive for
>> > coordination, but it's limited in the sense that everything shared among
>> > the group has to be communicated at the time the group is created. If any
>> > shared data changes, then the only way the group can ensure agreement is
>> to
>> > force a rebalance. This is expensive since all members must stall while
>> the
>> > rebalancing takes place. As we have also seen, there is a practical limit
>> > on the amount of metadata that can be sent through this protocol when
>> > groups get a little larger. This protocol is therefore not suitable to
>> > cases which require frequent communication or which require a large
>> amount
>> > of data to be communicated. For the use cases listed on the wiki, neither
>> > of these appear to be an issue, but there may be other limitations which
>> > would limit reuse of the protocol. Perhaps it would be sufficient to
>> sketch
>> > how these cases might work?
>> >
>> > 2. We talked a little bit about the issue of metadata churn. Becket
>> brought
>> > up the interesting point that not only do we depend on topic metadata
>> > changing relatively infrequently, but we also expect timely agreement
>> among
>> > the brokers on what that metadata is. To resolve this, we can have the
>> > consumers fetch metadata from the coordinator. We still depend on topic
>> > metadata not changing frequently, but this should resolve any
>> disagreement
>> > among the brokers themselves. In fact, since we expect that disagreement
>> is
>> > relatively rare, we can have the consumers fetch from the coordinator
>> only
>> > when when a disagreement occurs. The nice thing about this proposal is
>> that
>> > it doesn't affect the join group semantics, so the coordinator would
>> remain
>> > oblivious to the metadata used by the group for agreement. Also, if
>> > metadata churn becomes an issue, it might be possible to have the
>> > coordinator provide a snapshot for the group to ensure that a generation
>> > would be able to reach agreement (this would probably require adding
>> > groupId/generation to the metadata request).
>> >
>> > 3. We talked briefly about support for multiple protocols in the join
>> group
>> > request in order to allow changing the assignment strategy without
>> > downtime. I think it's a little doubtful that this would get much use in
>> > practice, but I agree it's a nice option to have on the table. An
>> > alternative, for the sake of argument, is to have each member provide
>> only
>> > one version of the protocol, and to let the coordinator choose the
>> protocol
>> > with the largest number of supporters. All members which can't support
>> the
>> > selected protocol would be kicked out of the group. The drawback in a
>> > rolling upgrade is that the total capacity of the group would be
>> > momentarily halved. It would also be a little tricky to handle the case
>> of
>> > retrying when a consumer is kicked out of the group. We wouldn't want it
>> to
>> > be able to effect a rebalance, for example, if it would just be kicked
>> out
>> > again. That would probably complicate the group management logic on the
>> > coordinator.
>> >
>> >
>> > Thanks,
>> > Jason
>> >
>> >
>> > On Tue, Aug 18, 2015 at 11:16 AM, Jiangjie Qin <jqin@linkedin.com.invalid
>> >
>> > wrote:
>> >
>> > > Jun,
>> > >
>> > > Yes, I agree. If the metadata can be synced quickly there should not be
>> > an
>> > > issue. It just occurred to me that there is a proposal to allow
>> consuming
>> > > from followers in ISR, that could potentially cause more frequent
>> > metadata
>> > > change for consumers. Would that be an issue?
>> > >
>> > > Thanks,
>> > >
>> > > Jiangjie (Becket) Qin
>> > >
>> > > On Tue, Aug 18, 2015 at 10:22 AM, Jason Gustafson <jason@confluent.io>
>> > > wrote:
>> > >
>> > > > Hi Jun,
>> > > >
>> > > > Answers below:
>> > > >
>> > > > 1. When there are multiple common protocols in the JoinGroupRequest,
>> > > which
>> > > > one would the coordinator pick?
>> > > >
>> > > > I was intending to use the list to indicate preference. If all group
>> > > > members support protocols ["A", "B"] in that order, then we will
>> choose
>> > > > "A." If some support ["B", "A"], then we would either choose based
on
>> > > > respective counts or just randomly. The main use case of supporting
>> the
>> > > > list is for rolling upgrades when a change is made to the assignment
>> > > > strategy. In that case, the new assignment strategy would be listed
>> > first
>> > > > in the upgraded client. I think it's debatable whether this feature
>> > would
>> > > > get much use in practice, so we might consider dropping it.
>> > > >
>> > > > 2. If the protocols don't agree, the group construction fails. What
>> > > exactly
>> > > > does it mean? Do we send an error in every JoinGroupResponse and
>> remove
>> > > all
>> > > > members in the group in the coordinator?
>> > > >
>> > > > Yes, that is right. It would be handled similarly to inconsistent
>> > > > assignment strategies in the current protocol. The coordinator
>> returns
>> > an
>> > > > error in each join group response, and the client propagates the
>> error
>> > to
>> > > > the user.
>> > > >
>> > > > 3. Consumer embedded protocol: The proposal has two different formats
>> > of
>> > > > subscription depending on whether wildcards are used or not. This
>> > seems a
>> > > > bit complicated. Would it be better to always use the metadata hash?
>> > The
>> > > > clients know the subscribed topics already. This way, the client code
>> > > > behaves the same whether wildcards are used or not.
>> > > >
>> > > > Yeah, I think this is possible (Neha also suggested it). I haven't
>> > > updated
>> > > > the wiki yet, but the patch I started working on uses only the
>> metadata
>> > > > hash. In the case that an explicit topic list is provided, the hash
>> > just
>> > > > covers the metadata for those topics.
>> > > >
>> > > >
>> > > > Thanks,
>> > > > Jason
>> > > >
>> > > >
>> > > >
>> > > > On Tue, Aug 18, 2015 at 10:06 AM, Jun Rao <jun@confluent.io>
wrote:
>> > > >
>> > > > > Jason,
>> > > > >
>> > > > > Thanks for the writeup. A few comments below.
>> > > > >
>> > > > > 1. When there are multiple common protocols in the
>> JoinGroupRequest,
>> > > > which
>> > > > > one would the coordinator pick?
>> > > > > 2. If the protocols don't agree, the group construction fails.
What
>> > > > exactly
>> > > > > does it mean? Do we send an error in every JoinGroupResponse
and
>> > remove
>> > > > all
>> > > > > members in the group in the coordinator?
>> > > > > 3. Consumer embedded protocol: The proposal has two different
>> formats
>> > > of
>> > > > > subscription depending on whether wildcards are used or not.
This
>> > > seems a
>> > > > > bit complicated. Would it be better to always use the metadata
>> hash?
>> > > The
>> > > > > clients know the subscribed topics already. This way, the client
>> code
>> > > > > behaves the same whether wildcards are used or not.
>> > > > >
>> > > > > Jiangjie,
>> > > > >
>> > > > > With respect to rebalance churns due to topics being
>> created/deleted.
>> > > > With
>> > > > > the new consumer, the rebalance can probably settle within 200ms
>> when
>> > > > there
>> > > > > is a topic change. So, as long as we are not changing topic more
>> > than 5
>> > > > > times per sec, there shouldn't be constant churns, right?
>> > > > >
>> > > > > Thanks,
>> > > > >
>> > > > > Jun
>> > > > >
>> > > > >
>> > > > >
>> > > > > On Tue, Aug 11, 2015 at 1:19 PM, Jason Gustafson <
>> jason@confluent.io
>> > >
>> > > > > wrote:
>> > > > >
>> > > > > > Hi Kafka Devs,
>> > > > > >
>> > > > > > One of the nagging issues in the current design of the new
>> consumer
>> > > has
>> > > > > > been the need to support a variety of assignment strategies.
>> We've
>> > > > > > encountered this in particular in the design of copycat
and the
>> > > > > processing
>> > > > > > framework (KIP-28). From what I understand, Samza also has
a
>> number
>> > > of
>> > > > > use
>> > > > > > cases with custom assignment needs. The new consumer protocol
>> > > supports
>> > > > > new
>> > > > > > assignment strategies by hooking them into the broker. For
many
>> > > > > > environments, this is a major pain and in some cases, a
>> > non-starter.
>> > > It
>> > > > > > also challenges the validation that the coordinator can
provide.
>> > For
>> > > > > > example, some assignment strategies call for partitions
to be
>> > > assigned
>> > > > > > multiple times, which means that the coordinator can only
check
>> > that
>> > > > > > partitions have been assigned at least once.
>> > > > > >
>> > > > > > To solve these issues, we'd like to propose moving assignment
to
>> > the
>> > > > > > client. I've written a wiki which outlines some protocol
changes
>> to
>> > > > > achieve
>> > > > > > this:
>> > > > > >
>> > > > > >
>> > > > >
>> > > >
>> > >
>> >
>> https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Client-side+Assignment+Proposal
>> > > > > > .
>> > > > > > To summarize briefly, instead of the coordinator assigning
the
>> > > > partitions
>> > > > > > itself, all subscriptions are forwarded to each member of
the
>> group
>> > > > which
>> > > > > > then decides independently which partitions it should consume.
>> The
>> > > > > protocol
>> > > > > > provides a mechanism for the coordinator to validate that
all
>> > > consumers
>> > > > > use
>> > > > > > the same assignment strategy, but it does not ensure that
the
>> > > resulting
>> > > > > > assignment is "correct." This provides a powerful capability
for
>> > > users
>> > > > to
>> > > > > > control the full data flow on the client side. They control
how
>> > data
>> > > is
>> > > > > > written to partitions through the Partitioner interface
and they
>> > > > control
>> > > > > > how data is consumed through the assignment strategy, all
without
>> > > > > touching
>> > > > > > the server.
>> > > > > >
>> > > > > > Of course nothing comes for free. In particular, this change
>> > removes
>> > > > the
>> > > > > > ability of the coordinator to validate that commits are
made by
>> > > > consumers
>> > > > > > who were assigned the respective partition. This might not
be too
>> > bad
>> > > > > since
>> > > > > > we retain the ability to validate the generation id, but
it is a
>> > > > > potential
>> > > > > > concern. We have considered alternative protocols which
add a
>> > second
>> > > > > > round-trip to the protocol in order to give the coordinator
the
>> > > ability
>> > > > > to
>> > > > > > confirm the assignment. As mentioned above, the coordinator
is
>> > > somewhat
>> > > > > > limited in what it can actually validate, but this would
return
>> its
>> > > > > ability
>> > > > > > to validate commits. The tradeoff is that it increases the
>> > protocol's
>> > > > > > complexity which means more ways for the protocol to fail
and
>> > > > > consequently
>> > > > > > more edge cases in the code.
>> > > > > >
>> > > > > > It also misses an opportunity to generalize the group membership
>> > > > protocol
>> > > > > > for additional use cases. In fact, after you've gone to
the
>> trouble
>> > > of
>> > > > > > moving assignment to the client, the main thing that is
left in
>> > this
>> > > > > > protocol is basically a general group management capability.
This
>> > is
>> > > > > > exactly what is needed for a few cases that are currently
under
>> > > > > discussion
>> > > > > > (e.g. copycat or single-writer producer). We've taken this
>> further
>> > > step
>> > > > > in
>> > > > > > the proposal and attempted to envision what that general
protocol
>> > > might
>> > > > > > look like and how it could be used both by the consumer
and for
>> > some
>> > > of
>> > > > > > these other cases.
>> > > > > >
>> > > > > > Anyway, since time is running out on the new consumer, we
have
>> > > perhaps
>> > > > > one
>> > > > > > last chance to consider a significant change in the protocol
like
>> > > this,
>> > > > > so
>> > > > > > have a look at the wiki and share your thoughts. I've no
doubt
>> that
>> > > > some
>> > > > > > ideas seem clearer in my mind than they do on paper, so
ask
>> > questions
>> > > > if
>> > > > > > there is any confusion.
>> > > > > >
>> > > > > > Thanks!
>> > > > > > Jason
>> > > > > >
>> > > > >
>> > > >
>> > >
>> >
>>
>
>
>
> --
> Thanks,
> Neha

Mime
View raw message