Return-Path: X-Original-To: apmail-kafka-dev-archive@www.apache.org Delivered-To: apmail-kafka-dev-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id C2B0D10F65 for ; Fri, 28 Aug 2015 09:51:01 +0000 (UTC) Received: (qmail 50421 invoked by uid 500); 28 Aug 2015 09:51:01 -0000 Delivered-To: apmail-kafka-dev-archive@kafka.apache.org Received: (qmail 50334 invoked by uid 500); 28 Aug 2015 09:51:01 -0000 Mailing-List: contact dev-help@kafka.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@kafka.apache.org Delivered-To: mailing list dev@kafka.apache.org Received: (qmail 50322 invoked by uid 99); 28 Aug 2015 09:51:00 -0000 Received: from Unknown (HELO spamd2-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 28 Aug 2015 09:51:00 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd2-us-west.apache.org (ASF Mail Server at spamd2-us-west.apache.org) with ESMTP id 860641AB027 for ; Fri, 28 Aug 2015 09:51:00 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd2-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -0.1 X-Spam-Level: X-Spam-Status: No, score=-0.1 tagged_above=-999 required=6.31 tests=[DKIM_SIGNED=0.1, DKIM_VALID=-0.1, DKIM_VALID_AU=-0.1, SPF_PASS=-0.001, URIBL_BLOCKED=0.001] autolearn=disabled Authentication-Results: spamd2-us-west.apache.org (amavisd-new); dkim=pass (2048-bit key) header.d=gmail.com Received: from mx1-us-west.apache.org ([10.40.0.8]) by localhost (spamd2-us-west.apache.org [10.40.0.9]) (amavisd-new, port 10024) with ESMTP id XUadRJROwJlS for ; Fri, 28 Aug 2015 09:50:54 +0000 (UTC) Received: from mail-yk0-f172.google.com (mail-yk0-f172.google.com [209.85.160.172]) by mx1-us-west.apache.org (ASF Mail Server at mx1-us-west.apache.org) with ESMTPS id 742B520752 for ; Fri, 28 Aug 2015 09:50:54 +0000 (UTC) Received: by ykfw73 with SMTP id w73so48172311ykf.3 for ; Fri, 28 Aug 2015 02:50:47 -0700 (PDT) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=gmail.com; s=20120113; h=mime-version:in-reply-to:references:date:message-id:subject:from:to :content-type; bh=qgSBo5YODGhH6p1lTo3s/92nlJSlAVBih5cpHRxbrvA=; b=lLbA+MWDjDZPCArMilMma0cD+2jUQJ4l64LtFbHPtBy3qkaQTpRX6NaJzqdGBvw/eT icRYKENSowwPa5wU5s9FnqCLog5dNHaRbzVSpl36zYuBjxaBu1m/lS6FUQEbmP8cCWoR /6q7vFg9h0eX1Ex8hoG+HbKuVfsofz9RVHr/J9QwfCQcK/xaTqwxyAzlGeBD0wUKp0DY 9Hwy1729EVkA+bB9/296ZIMevUNp69TKKCkIKNu8aZSXda3GtHGa4o8D4bZpwtTIRDPz Va3d7CiXr2OBZNNLxzRJgopBXvPrKbcwWc74m0asnu0ekMsnf6upTsvUEya4/onmOz31 lqAg== MIME-Version: 1.0 X-Received: by 10.170.210.214 with SMTP id b205mr7737154ykf.9.1440755447591; Fri, 28 Aug 2015 02:50:47 -0700 (PDT) Received: by 10.37.78.9 with HTTP; Fri, 28 Aug 2015 02:50:47 -0700 (PDT) In-Reply-To: References: Date: Fri, 28 Aug 2015 02:50:47 -0700 Message-ID: Subject: Re: [DISCUSS] Client-side Assignment for New Consumer From: Joel Koshy To: "dev@kafka.apache.org" Content-Type: text/plain; charset=UTF-8 > I think we think this proposal addresses 100% of the split brain issues > ever seen in the ZK-based protocol, but I think you think there are still > issues. Can you explain what your thinking of and when you think it would > happen? I want to make sure you aren't assuming client-side=>split-brain > since I think that is totally not the case. Yes I had concluded that client-side assignment would still result in split-brain wrt partition counts, but I overlooked a key sentence in the wiki - i.e., that the assignment algorithm for consumers can just use the largest number of partitions for each topic reported by any of the consumers. i.e., I assumed that consumers would just fail rebalance if the partition counts were inconsistent but that is not the case since this conflict can be easily resolved as described without further join-group requests. Sorry about that. There is still the issue of the coordinator having to send back n*m worth of metadata, but that was not my biggest concern. I'll look over it again and reply back tomorrow. Joel On Thu, Aug 27, 2015 at 2:55 PM, Jay Kreps wrote: > Hey Joel, > > I really don't think we should do both. There are pros and cons but we > should make a decision and work on operationalizing one approach. Much of > really making something like this work is getting all the bugs out, getting > monitoring in place, getting rigorous system tests in place. Trying to do > those things twice with the same resources will just mean we do them half > as well. I also think this buys nothing from the user's point of view--they > want co-ordination that works correctly, the debate we are having is purely > a "how should we build that" debate. So this is really not the kind of > thing we'd want to make pluggable and if we did that would just complicate > life for the user. > > I think we think this proposal addresses 100% of the split brain issues > ever seen in the ZK-based protocol, but I think you think there are still > issues. Can you explain what your thinking of and when you think it would > happen? I want to make sure you aren't assuming client-side=>split-brain > since I think that is totally not the case. > > With respect to "herd issues" I actually think all the proposals address > this by scaling the co-ordinator out to all nodes and making the > co-ordination vastly cheaper. No proposal, of course, gets rid of the fact > that all clients rejoin at once when there is a membership change, but that > is kind of fundamental to the problem. > > -Jay > > On Thu, Aug 27, 2015 at 2:02 PM, Joel Koshy wrote: > >> I actually feel these set of tests (whatever they may be) are somewhat >> irrelevant here. My main concern with the current client-side proposal >> (i.e., without Becket's follow-up suggestions) is that it makes a >> significant compromise to the original charter of the new consumer - >> i.e., reduce/eliminate herd and split brain problems in both group >> management and partition assignment. I understand the need for >> client-side partition assignment in some use cases (which we are also >> interested in), but I also think we should make every effort to keep >> full server-side coordination for the remaining (majority) of use >> cases especially if it does not complicate the protocol. The proposed >> changes do not complicate the protocol IMO - i.e., there is no further >> modification to the request/response formats beyond the current >> client-side proposal. It only involves a trivial reinterpretation of >> the content of the protocol metadata field. >> >> Joel >> >> On Wed, Aug 26, 2015 at 9:33 PM, Neha Narkhede wrote: >> > Hey Becket, >> > >> > In that case, the broker side partition assignment would be ideal because >> >> it avoids >> >> issues like metadata inconsistency / split brain / exploding >> subscription >> >> set propagation. >> > >> > >> > As per our previous discussions regarding each of those concerns >> (referring >> > to this email thread, KIP calls and JIRA comments), we are going to run a >> > set of tests using the LinkedIn deployment numbers that we will wait for >> > you to share. The purpose is to see if those concerns are really valid or >> > not. I'd prefer to see that before making any more changes that will >> > complicate the protocol. >> > >> > On Wed, Aug 26, 2015 at 4:57 PM, Jiangjie Qin > > >> > wrote: >> > >> >> Hi folks, >> >> >> >> After further discussion in LinkedIn, we found that while having a more >> >> general group management protocol is very useful, the vast majority of >> the >> >> clients will not use customized partition assignment strategy. In that >> >> case, the broker side partition assignment would be ideal because it >> avoids >> >> issues like metadata inconsistency / split brain / exploding >> subscription >> >> set propagation. >> >> >> >> So we have the following proposal that satisfies the majority of the >> >> clients' needs without changing the currently proposed binary protocol. >> >> i.e., Continue to support broker-side assignment if the assignment >> strategy >> >> is recognized by the coordinator. >> >> >> >> 1. Keep the binary protocol as currently proposed. >> >> >> >> 2. Change the way we interpret ProtocolMetadata: >> >> 2.1 On consumer side, change partition.assignment.strategy to >> >> partition.assignor.class. Implement the something like the following >> >> PartitionAssignor Interface: >> >> >> >> public interface PartitionAssignor { >> >> List protocolTypes(); >> >> byte[] protocolMetadata(); >> >> // return the Topic->List map that are assigned to this >> >> consumer. >> >> List assignPartitions(String protocolType, byte[] >> >> responseProtocolMetadata); >> >> } >> >> >> >> public abstract class AbstractPartitionAssignor implements >> >> PartitionAssignor { >> >> protected final KafkaConsumer consumer; >> >> AbstractPartitionAssignor(KafkaConsumer consumer) { >> >> this.consumer = consumer; >> >> } >> >> } >> >> >> >> 2.2 The ProtocolMetadata in JoinGroupRequest will be >> >> partitionAssignor.protocolMetadata(). When partition.assignor.class is >> >> "range" or "roundrobin", the ProtocolMetadata in JoinGroupRequest will >> be a >> >> JSON subscription set. ("range", "roundrobin" will be reserved words, we >> >> can also consider reserving some Prefix such as "broker-" to be more >> clear) >> >> 2.3 On broker side when ProtocolType is "range" or "roundroubin", >> >> coordinator will parse the ProtocolMetadata in the JoinGroupRequest and >> >> assign the partitions for consumers. In the JoinGroupResponse, the >> >> ProtocolMetadata will be the global assignment of partitions. >> >> 2.4 On client side, after receiving the JoinGroupResponse, >> >> partitionAssignor.assignPartitions() will be invoked to return the >> actual >> >> assignment. If the assignor is RangeAssignor or RoundRobinAssignor, they >> >> will parse the assignment from the ProtocolMetadata returned by >> >> coordinator. >> >> >> >> This approach has a few merits: >> >> 1. Does not change the proposed binary protocol, which is still general. >> >> 2. The majority of the consumers will not suffer from inconsistent >> metadata >> >> / split brain / exploding subscription set propagation. This is >> >> specifically to deal with the issue that the current proposal caters to >> a >> >> 20% use-case while adversely impacting the more common 80% use-cases. >> >> 3. Easy to implement. The only thing needed is implement a partitioner >> >> class. For most users, the default range and roundrobin partitioner are >> >> good enough. >> >> >> >> Thoughts? >> >> >> >> Thanks, >> >> >> >> Jiangjie (Becket) Qin >> >> >> >> On Tue, Aug 18, 2015 at 2:51 PM, Jason Gustafson >> >> wrote: >> >> >> >> > Follow-up from the kip call: >> >> > >> >> > 1. Onur brought up the question of whether this protocol provides >> enough >> >> > coordination capabilities to be generally useful in practice (is that >> >> > accurate, Onur?). If it doesn't, then each use case would probably >> need a >> >> > dependence on zookeeper anyway, and we haven't really gained anything. >> >> The >> >> > group membership provided by this protocol is a useful primitive for >> >> > coordination, but it's limited in the sense that everything shared >> among >> >> > the group has to be communicated at the time the group is created. If >> any >> >> > shared data changes, then the only way the group can ensure agreement >> is >> >> to >> >> > force a rebalance. This is expensive since all members must stall >> while >> >> the >> >> > rebalancing takes place. As we have also seen, there is a practical >> limit >> >> > on the amount of metadata that can be sent through this protocol when >> >> > groups get a little larger. This protocol is therefore not suitable to >> >> > cases which require frequent communication or which require a large >> >> amount >> >> > of data to be communicated. For the use cases listed on the wiki, >> neither >> >> > of these appear to be an issue, but there may be other limitations >> which >> >> > would limit reuse of the protocol. Perhaps it would be sufficient to >> >> sketch >> >> > how these cases might work? >> >> > >> >> > 2. We talked a little bit about the issue of metadata churn. Becket >> >> brought >> >> > up the interesting point that not only do we depend on topic metadata >> >> > changing relatively infrequently, but we also expect timely agreement >> >> among >> >> > the brokers on what that metadata is. To resolve this, we can have the >> >> > consumers fetch metadata from the coordinator. We still depend on >> topic >> >> > metadata not changing frequently, but this should resolve any >> >> disagreement >> >> > among the brokers themselves. In fact, since we expect that >> disagreement >> >> is >> >> > relatively rare, we can have the consumers fetch from the coordinator >> >> only >> >> > when when a disagreement occurs. The nice thing about this proposal is >> >> that >> >> > it doesn't affect the join group semantics, so the coordinator would >> >> remain >> >> > oblivious to the metadata used by the group for agreement. Also, if >> >> > metadata churn becomes an issue, it might be possible to have the >> >> > coordinator provide a snapshot for the group to ensure that a >> generation >> >> > would be able to reach agreement (this would probably require adding >> >> > groupId/generation to the metadata request). >> >> > >> >> > 3. We talked briefly about support for multiple protocols in the join >> >> group >> >> > request in order to allow changing the assignment strategy without >> >> > downtime. I think it's a little doubtful that this would get much use >> in >> >> > practice, but I agree it's a nice option to have on the table. An >> >> > alternative, for the sake of argument, is to have each member provide >> >> only >> >> > one version of the protocol, and to let the coordinator choose the >> >> protocol >> >> > with the largest number of supporters. All members which can't support >> >> the >> >> > selected protocol would be kicked out of the group. The drawback in a >> >> > rolling upgrade is that the total capacity of the group would be >> >> > momentarily halved. It would also be a little tricky to handle the >> case >> >> of >> >> > retrying when a consumer is kicked out of the group. We wouldn't want >> it >> >> to >> >> > be able to effect a rebalance, for example, if it would just be kicked >> >> out >> >> > again. That would probably complicate the group management logic on >> the >> >> > coordinator. >> >> > >> >> > >> >> > Thanks, >> >> > Jason >> >> > >> >> > >> >> > On Tue, Aug 18, 2015 at 11:16 AM, Jiangjie Qin >> > >> > >> >> > wrote: >> >> > >> >> > > Jun, >> >> > > >> >> > > Yes, I agree. If the metadata can be synced quickly there should >> not be >> >> > an >> >> > > issue. It just occurred to me that there is a proposal to allow >> >> consuming >> >> > > from followers in ISR, that could potentially cause more frequent >> >> > metadata >> >> > > change for consumers. Would that be an issue? >> >> > > >> >> > > Thanks, >> >> > > >> >> > > Jiangjie (Becket) Qin >> >> > > >> >> > > On Tue, Aug 18, 2015 at 10:22 AM, Jason Gustafson < >> jason@confluent.io> >> >> > > wrote: >> >> > > >> >> > > > Hi Jun, >> >> > > > >> >> > > > Answers below: >> >> > > > >> >> > > > 1. When there are multiple common protocols in the >> JoinGroupRequest, >> >> > > which >> >> > > > one would the coordinator pick? >> >> > > > >> >> > > > I was intending to use the list to indicate preference. If all >> group >> >> > > > members support protocols ["A", "B"] in that order, then we will >> >> choose >> >> > > > "A." If some support ["B", "A"], then we would either choose >> based on >> >> > > > respective counts or just randomly. The main use case of >> supporting >> >> the >> >> > > > list is for rolling upgrades when a change is made to the >> assignment >> >> > > > strategy. In that case, the new assignment strategy would be >> listed >> >> > first >> >> > > > in the upgraded client. I think it's debatable whether this >> feature >> >> > would >> >> > > > get much use in practice, so we might consider dropping it. >> >> > > > >> >> > > > 2. If the protocols don't agree, the group construction fails. >> What >> >> > > exactly >> >> > > > does it mean? Do we send an error in every JoinGroupResponse and >> >> remove >> >> > > all >> >> > > > members in the group in the coordinator? >> >> > > > >> >> > > > Yes, that is right. It would be handled similarly to inconsistent >> >> > > > assignment strategies in the current protocol. The coordinator >> >> returns >> >> > an >> >> > > > error in each join group response, and the client propagates the >> >> error >> >> > to >> >> > > > the user. >> >> > > > >> >> > > > 3. Consumer embedded protocol: The proposal has two different >> formats >> >> > of >> >> > > > subscription depending on whether wildcards are used or not. This >> >> > seems a >> >> > > > bit complicated. Would it be better to always use the metadata >> hash? >> >> > The >> >> > > > clients know the subscribed topics already. This way, the client >> code >> >> > > > behaves the same whether wildcards are used or not. >> >> > > > >> >> > > > Yeah, I think this is possible (Neha also suggested it). I haven't >> >> > > updated >> >> > > > the wiki yet, but the patch I started working on uses only the >> >> metadata >> >> > > > hash. In the case that an explicit topic list is provided, the >> hash >> >> > just >> >> > > > covers the metadata for those topics. >> >> > > > >> >> > > > >> >> > > > Thanks, >> >> > > > Jason >> >> > > > >> >> > > > >> >> > > > >> >> > > > On Tue, Aug 18, 2015 at 10:06 AM, Jun Rao >> wrote: >> >> > > > >> >> > > > > Jason, >> >> > > > > >> >> > > > > Thanks for the writeup. A few comments below. >> >> > > > > >> >> > > > > 1. When there are multiple common protocols in the >> >> JoinGroupRequest, >> >> > > > which >> >> > > > > one would the coordinator pick? >> >> > > > > 2. If the protocols don't agree, the group construction fails. >> What >> >> > > > exactly >> >> > > > > does it mean? Do we send an error in every JoinGroupResponse and >> >> > remove >> >> > > > all >> >> > > > > members in the group in the coordinator? >> >> > > > > 3. Consumer embedded protocol: The proposal has two different >> >> formats >> >> > > of >> >> > > > > subscription depending on whether wildcards are used or not. >> This >> >> > > seems a >> >> > > > > bit complicated. Would it be better to always use the metadata >> >> hash? >> >> > > The >> >> > > > > clients know the subscribed topics already. This way, the client >> >> code >> >> > > > > behaves the same whether wildcards are used or not. >> >> > > > > >> >> > > > > Jiangjie, >> >> > > > > >> >> > > > > With respect to rebalance churns due to topics being >> >> created/deleted. >> >> > > > With >> >> > > > > the new consumer, the rebalance can probably settle within 200ms >> >> when >> >> > > > there >> >> > > > > is a topic change. So, as long as we are not changing topic more >> >> > than 5 >> >> > > > > times per sec, there shouldn't be constant churns, right? >> >> > > > > >> >> > > > > Thanks, >> >> > > > > >> >> > > > > Jun >> >> > > > > >> >> > > > > >> >> > > > > >> >> > > > > On Tue, Aug 11, 2015 at 1:19 PM, Jason Gustafson < >> >> jason@confluent.io >> >> > > >> >> > > > > wrote: >> >> > > > > >> >> > > > > > Hi Kafka Devs, >> >> > > > > > >> >> > > > > > One of the nagging issues in the current design of the new >> >> consumer >> >> > > has >> >> > > > > > been the need to support a variety of assignment strategies. >> >> We've >> >> > > > > > encountered this in particular in the design of copycat and >> the >> >> > > > > processing >> >> > > > > > framework (KIP-28). From what I understand, Samza also has a >> >> number >> >> > > of >> >> > > > > use >> >> > > > > > cases with custom assignment needs. The new consumer protocol >> >> > > supports >> >> > > > > new >> >> > > > > > assignment strategies by hooking them into the broker. For >> many >> >> > > > > > environments, this is a major pain and in some cases, a >> >> > non-starter. >> >> > > It >> >> > > > > > also challenges the validation that the coordinator can >> provide. >> >> > For >> >> > > > > > example, some assignment strategies call for partitions to be >> >> > > assigned >> >> > > > > > multiple times, which means that the coordinator can only >> check >> >> > that >> >> > > > > > partitions have been assigned at least once. >> >> > > > > > >> >> > > > > > To solve these issues, we'd like to propose moving assignment >> to >> >> > the >> >> > > > > > client. I've written a wiki which outlines some protocol >> changes >> >> to >> >> > > > > achieve >> >> > > > > > this: >> >> > > > > > >> >> > > > > > >> >> > > > > >> >> > > > >> >> > > >> >> > >> >> >> https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Client-side+Assignment+Proposal >> >> > > > > > . >> >> > > > > > To summarize briefly, instead of the coordinator assigning the >> >> > > > partitions >> >> > > > > > itself, all subscriptions are forwarded to each member of the >> >> group >> >> > > > which >> >> > > > > > then decides independently which partitions it should consume. >> >> The >> >> > > > > protocol >> >> > > > > > provides a mechanism for the coordinator to validate that all >> >> > > consumers >> >> > > > > use >> >> > > > > > the same assignment strategy, but it does not ensure that the >> >> > > resulting >> >> > > > > > assignment is "correct." This provides a powerful capability >> for >> >> > > users >> >> > > > to >> >> > > > > > control the full data flow on the client side. They control >> how >> >> > data >> >> > > is >> >> > > > > > written to partitions through the Partitioner interface and >> they >> >> > > > control >> >> > > > > > how data is consumed through the assignment strategy, all >> without >> >> > > > > touching >> >> > > > > > the server. >> >> > > > > > >> >> > > > > > Of course nothing comes for free. In particular, this change >> >> > removes >> >> > > > the >> >> > > > > > ability of the coordinator to validate that commits are made >> by >> >> > > > consumers >> >> > > > > > who were assigned the respective partition. This might not be >> too >> >> > bad >> >> > > > > since >> >> > > > > > we retain the ability to validate the generation id, but it >> is a >> >> > > > > potential >> >> > > > > > concern. We have considered alternative protocols which add a >> >> > second >> >> > > > > > round-trip to the protocol in order to give the coordinator >> the >> >> > > ability >> >> > > > > to >> >> > > > > > confirm the assignment. As mentioned above, the coordinator is >> >> > > somewhat >> >> > > > > > limited in what it can actually validate, but this would >> return >> >> its >> >> > > > > ability >> >> > > > > > to validate commits. The tradeoff is that it increases the >> >> > protocol's >> >> > > > > > complexity which means more ways for the protocol to fail and >> >> > > > > consequently >> >> > > > > > more edge cases in the code. >> >> > > > > > >> >> > > > > > It also misses an opportunity to generalize the group >> membership >> >> > > > protocol >> >> > > > > > for additional use cases. In fact, after you've gone to the >> >> trouble >> >> > > of >> >> > > > > > moving assignment to the client, the main thing that is left >> in >> >> > this >> >> > > > > > protocol is basically a general group management capability. >> This >> >> > is >> >> > > > > > exactly what is needed for a few cases that are currently >> under >> >> > > > > discussion >> >> > > > > > (e.g. copycat or single-writer producer). We've taken this >> >> further >> >> > > step >> >> > > > > in >> >> > > > > > the proposal and attempted to envision what that general >> protocol >> >> > > might >> >> > > > > > look like and how it could be used both by the consumer and >> for >> >> > some >> >> > > of >> >> > > > > > these other cases. >> >> > > > > > >> >> > > > > > Anyway, since time is running out on the new consumer, we have >> >> > > perhaps >> >> > > > > one >> >> > > > > > last chance to consider a significant change in the protocol >> like >> >> > > this, >> >> > > > > so >> >> > > > > > have a look at the wiki and share your thoughts. I've no doubt >> >> that >> >> > > > some >> >> > > > > > ideas seem clearer in my mind than they do on paper, so ask >> >> > questions >> >> > > > if >> >> > > > > > there is any confusion. >> >> > > > > > >> >> > > > > > Thanks! >> >> > > > > > Jason >> >> > > > > > >> >> > > > > >> >> > > > >> >> > > >> >> > >> >> >> > >> > >> > >> > -- >> > Thanks, >> > Neha >>