Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 6C864200D57 for ; Mon, 11 Dec 2017 23:00:30 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id 64B87160C13; Mon, 11 Dec 2017 22:00:30 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 5B554160C03 for ; Mon, 11 Dec 2017 23:00:29 +0100 (CET) Received: (qmail 68670 invoked by uid 500); 11 Dec 2017 22:00:28 -0000 Mailing-List: contact dev-help@kafka.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@kafka.apache.org Delivered-To: mailing list dev@kafka.apache.org Received: (qmail 68658 invoked by uid 99); 11 Dec 2017 22:00:27 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd3-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 11 Dec 2017 22:00:27 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd3-us-west.apache.org (ASF Mail Server at spamd3-us-west.apache.org) with ESMTP id 626A0180780 for ; Mon, 11 Dec 2017 22:00:27 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd3-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 1.779 X-Spam-Level: * X-Spam-Status: No, score=1.779 tagged_above=-999 required=6.31 tests=[DKIM_SIGNED=0.1, DKIM_VALID=-0.1, HTML_MESSAGE=2, RCVD_IN_DNSWL_LOW=-0.7, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, RCVD_IN_SORBS_SPAM=0.5, SPF_PASS=-0.001] autolearn=disabled Authentication-Results: spamd3-us-west.apache.org (amavisd-new); dkim=pass (2048-bit key) header.d=confluent-io.20150623.gappssmtp.com Received: from mx1-lw-us.apache.org ([10.40.0.8]) by localhost (spamd3-us-west.apache.org [10.40.0.10]) (amavisd-new, port 10024) with ESMTP id zS2UD1-HKTIp for ; Mon, 11 Dec 2017 22:00:11 +0000 (UTC) Received: from mail-qt0-f181.google.com (mail-qt0-f181.google.com [209.85.216.181]) by mx1-lw-us.apache.org (ASF Mail Server at mx1-lw-us.apache.org) with ESMTPS id C626F6270C for ; Mon, 11 Dec 2017 21:51:47 +0000 (UTC) Received: by mail-qt0-f181.google.com with SMTP id e2so42511632qti.0 for ; Mon, 11 Dec 2017 13:51:47 -0800 (PST) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=confluent-io.20150623.gappssmtp.com; s=20150623; h=mime-version:in-reply-to:references:from:date:message-id:subject:to; bh=1N6l59zKrmG7QxrBOQchfb8UYZrzLsLx2+2tZ0wKj/M=; b=pQp7kg0t8Nf0XRK19ZN3j0euL/abvVwL8c+9Z2cNlh87dt0ADDW6jYIUSyJIZf10Vv OA0ri1v6jIe8HWCQPGWlPllR6K7KGmv4HdjWfXGcrI2XWFBa27l7RQWUOOm1trNePCuW 48fiFUaKpJqfdJyK3KD4lWfMgByxnAu1GkP7BKU4Q0l8HXwWkSPdrSb5apFpaeOhHD8/ +sRA10An9+PwxjiKnH/6z8FEuVO6KqEStD5muXheei07hsZ+YUQ5rJHey9FJ0PamRPaR AwOsn8whu0a/fO7cQJDJWjeAXr7zTFY64mm1WzQYPzAkv4pOz6GN/xOP6Rf7SD3mFv7K Bhzw== X-Google-DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=1e100.net; s=20161025; h=x-gm-message-state:mime-version:in-reply-to:references:from:date :message-id:subject:to; bh=1N6l59zKrmG7QxrBOQchfb8UYZrzLsLx2+2tZ0wKj/M=; b=l9Ch/FqsTy3Z0fKVXQrXsXFDir9FVCF1toF5Yveq6S+xnMhlSSAPWMDftyxkIKa7r9 AsMYqvvsNxde5IfqOoK3zvxvPseWAamlwyBOIIlg68w+sHiBh4mbk0DWn+WYLaY4b22y vJb3Q/G+CPfdBZYDK3WTzr5wML/Su/coOOcQlSoPnX5yBKb05wF2QGPrRAnZpAyK08Ik +JmXvJ++cQai/Cm2PCnst31vV9cq4GZhVBURO5OoHSCnKq4zsqAr8YJAMD3xcZxW6xiZ nTpYhJi1r1VI+87QdqQo1Bt84wlsMRGLGs+2jOyaBdnYTX7sQmpmsxH1UjcpFdSkCdJ9 RdrA== X-Gm-Message-State: AKGB3mLkpS56jo+Zvanx/5M/cWQ5e+GRQV4K8gbsbPJM02MRzwquzNIx Wn8fkahcNoN0H/vq3LqiaEmQpFPyjGWVw7VNHnETY3Pg X-Google-Smtp-Source: ACJfBounG6CGZLtf6tTWKAEKV9lywjX/lGRJR6Xh9X25gkBitR6X/7E0ZMy6deSh2Bi7XKps59XgJtVqeNM3hXzWSIg= X-Received: by 10.237.39.155 with SMTP id a27mr2723217qtd.201.1513029107206; Mon, 11 Dec 2017 13:51:47 -0800 (PST) MIME-Version: 1.0 Received: by 10.12.146.151 with HTTP; Mon, 11 Dec 2017 13:51:46 -0800 (PST) In-Reply-To: References: From: Jun Rao Date: Mon, 11 Dec 2017 13:51:46 -0800 Message-ID: Subject: Re: [DISCUSS] KIP-232: Detect outdated metadata by adding ControllerMetadataEpoch field To: "dev@kafka.apache.org" Content-Type: multipart/alternative; boundary="f403045ed7a07871530560178aa6" archived-at: Mon, 11 Dec 2017 22:00:30 -0000 --f403045ed7a07871530560178aa6 Content-Type: text/plain; charset="UTF-8" Hi, Dong, Thanks for the reply. My suggestion of forcing the metadata refresh from the controller may not work in general since the cached controller could be outdated too. The general problem is that if a consumer's metadata is outdated, it may get stuck with the old leader for a long time. We can address the issue of detecting outdated metadata in a separate KIP in the future if you didn't intend to address it in this KIP. Thanks, Jun On Sat, Dec 9, 2017 at 10:12 PM, Dong Lin wrote: > Hey Jun, > > Thanks much for your comments. Given that client needs to de-serialize the > metadata anyway, the extra overhead of checking the per-partition version > for every partition should not be a big concern. Thus it makes sense to use > leader epoch as the per-partition version instead of creating a global > metadata version. I will update the KIP to do that. > > Regarding the detection of outdated metadata, I think it is possible to > ensure that client gets latest metadata by fetching from controller. Note > that this requires extra logic in the controller such that controller > updates metadata directly in memory without requiring > UpdateMetadataRequest. But I am not sure the main motivation of this at > this moment. But this makes controller more like a bottleneck in the > cluster which we probably want to avoid. > > I think we can probably keep the current way of ensuring metadata > freshness. Currently client will be forced to refresh metadata if broker > returns error (e.g. NotLeaderForPartition) due to outdated metadata or if > the metadata does not contain the partition that the client needs. In the > future, as you previously suggested, we can include per-partition > leaderEpoch in the FetchRequest/ProduceRequest such that broker can return > error if the epoch is smaller than cached epoch in the broker. Given that > this adds more complexity to Kafka, I think we can probably think about > that leader when we have a specific use-case or problem to solve with > up-to-date metadata. Does this sound OK? > > Thanks, > Dong > > > > On Fri, Dec 8, 2017 at 3:53 PM, Jun Rao wrote: > > > Hi, Dong, > > > > Thanks for the reply. A few more points below. > > > > For dealing with how to prevent a consumer switching from a new leader to > > an old leader, you suggestion that refreshes metadata on consumer restart > > until it sees a metadata version >= the one associated with the offset > > works too, as long as we guarantee that the cached metadata versions on > the > > brokers only go up. > > > > The second discussion point is on whether the metadata versioning should > be > > per partition or global. For the partition level versioning, you were > > concerned about the performance. Given that metadata updates are rare, I > am > > not sure if it's a big concern though. Doing a million if tests is > probably > > going to take less than 1ms. Another thing is that the metadata version > > seems to need to survive controller failover. In your current approach, a > > consumer may not be able to wait on the right version of the metadata > after > > the consumer restart since the metadata version may have been recycled on > > the server side due to a controller failover while the consumer is down. > > The partition level leaderEpoch survives controller failure and won't > have > > this issue. > > > > Lastly, neither your proposal nor mine addresses the issue how to > guarantee > > a consumer to detect that is metadata is outdated. Currently, the > consumer > > is not guaranteed to fetch metadata from every broker within some bounded > > period of time. Maybe this is out of the scope of your KIP. But one idea > is > > force the consumer to refresh metadata from the controller periodically. > > > > Jun > > > > > > On Thu, Dec 7, 2017 at 11:25 AM, Dong Lin wrote: > > > > > Hey Jun, > > > > > > Thanks much for the comments. Great point particularly regarding (3). I > > > haven't thought about this before. > > > > > > It seems that there are two possible ways where the version number can > be > > > used. One solution is for client to check the version number at the > time > > it > > > receives MetadataResponse. And if the version number in the > > > MetadataResponse is smaller than the version number in the client's > > cache, > > > the client will be forced to fetch metadata again. Another solution, > as > > > you have suggested, is for broker to check the version number at the > time > > > it receives a request from client. The broker will reject the request > if > > > the version is smaller than the version in broker's cache. > > > > > > I am not very sure that the second solution can address the problem > here. > > > In the scenario described in the JIRA ticket, broker's cache may be > > > outdated because it has not processed the LeaderAndIsrRequest from the > > > controller. Thus it may still process client's request even if the > > version > > > in client's request is actually outdated. Does this make sense? > > > > > > IMO, it seems that we can address problem (3) by saving the metadata > > > version together with the offset. After consumer starts, it will keep > > > fetching metadata until the metadata version >= the version saved with > > the > > > offset of this partition. > > > > > > Regarding problems (1) and (2): Currently we use the version number in > > the > > > MetadataResponse to ensure that the metadata does not go back in time. > > > There are two alternative solutions to address problems (1) and (2). > One > > > solution is for client to enumerate all partitions in the > > MetadataResponse, > > > compare their epoch with those in the cached metadata, and rejects the > > > MetadataResponse iff any leader epoch is smaller. The main concern is > > that > > > MetadataResponse currently cached information of all partitions in the > > > entire cluster. It may slow down client's performance if we were to do > > it. > > > The other solution is for client to enumerate partitions for only > topics > > > registered in the org.apache.kafka.clients.Metadata, which will be an > > > empty > > > set for producer and the set of subscribed partitions for consumer. But > > > this degrades to all topics if consumer subscribes to topics in the > > cluster > > > by pattern. > > > > > > Note that client will only be forced to update metadata if the version > in > > > the MetadataResponse is smaller than the version in the cached > metadata. > > In > > > general it should not be a problem. It can be a problem only if some > > broker > > > is particularly slower than other brokers in processing > > > UpdateMetadataRequest. When this is the case, it means that the broker > is > > > also particularly slower in processing LeaderAndIsrRequest, which can > > cause > > > problem anyway because some partition will probably have no leader > during > > > this period. I am not sure problems (1) and (2) cause more problem than > > > what we already have. > > > > > > Thanks, > > > Dong > > > > > > > > > On Wed, Dec 6, 2017 at 6:42 PM, Jun Rao wrote: > > > > > > > Hi, Dong, > > > > > > > > Great finding on the issue. It's a real problem. A few comments about > > the > > > > KIP. (1) I am not sure about updating controller_metadata_epoch on > > every > > > > UpdateMetadataRequest. Currently, the controller can send > > > > UpdateMetadataRequest when there is no actual metadata change. Doing > > this > > > > may require unnecessary metadata refresh on the client. (2) > > > > controller_metadata_epoch is global across all topics. This means > that > > a > > > > client may be forced to update its metadata even when the metadata > for > > > the > > > > topics that it cares haven't changed. (3) It doesn't seem that the > KIP > > > > handles the corner case when a consumer is restarted. Say a consumer > > > reads > > > > from the new leader, commits the offset and then is restarted. On > > > restart, > > > > the consumer gets an outdated metadata and fetches from the old > leader. > > > > Then, the consumer will get into the offset out of range issue. > > > > > > > > Given the above, I am thinking of the following approach. We actually > > > > already have metadata versioning at the partition level. Each leader > > has > > > a > > > > leader epoch which is monotonically increasing. We can potentially > > > > propagate leader epoch back in the metadata response and the clients > > can > > > > cache that. This solves the issue of (1) and (2). To solve (3), when > > > saving > > > > an offset, we could save both an offset and the corresponding leader > > > epoch. > > > > When fetching the data, the consumer provides both the offset and the > > > > leader epoch. A leader will only serve the request if its leader > epoch > > is > > > > equal to or greater than the leader epoch from the consumer. To > achieve > > > > this, we need to change the fetch request protocol and the offset > > commit > > > > api, which requires some more thoughts. > > > > > > > > Thanks, > > > > > > > > Jun > > > > > > > > > > > > On Wed, Dec 6, 2017 at 10:57 AM, Dong Lin > wrote: > > > > > > > > > Bump up the thread. > > > > > > > > > > It will be great to have more comments on whether we should do it > or > > > > > whether there is better way to address the motivation of this KIP. > > > > > > > > > > On Mon, Dec 4, 2017 at 3:09 PM, Dong Lin > > wrote: > > > > > > > > > > > I don't have an interesting rejected alternative solution to put > in > > > the > > > > > > KIP. If there is good alternative solution from anyone in this > > > thread, > > > > I > > > > > am > > > > > > happy to discuss this and update the KIP accordingly. > > > > > > > > > > > > Thanks, > > > > > > Dong > > > > > > > > > > > > On Mon, Dec 4, 2017 at 1:12 PM, Ted Yu > > wrote: > > > > > > > > > > > >> It is clearer now. > > > > > >> > > > > > >> I noticed that Rejected Alternatives section is empty. > > > > > >> Have you considered any alternative ? > > > > > >> > > > > > >> Cheers > > > > > >> > > > > > >> On Mon, Dec 4, 2017 at 1:07 PM, Dong Lin > > > wrote: > > > > > >> > > > > > >> > Ted, thanks for catching this. I have updated the sentence to > > make > > > > it > > > > > >> > readable. > > > > > >> > > > > > > >> > Thanks, > > > > > >> > Dong > > > > > >> > > > > > > >> > On Sat, Dec 2, 2017 at 3:05 PM, Ted Yu > > > wrote: > > > > > >> > > > > > > >> > > bq. It the controller_epoch of the incoming > MetadataResponse, > > or > > > > if > > > > > >> the > > > > > >> > > controller_epoch is the same but the > controller_metadata_epoch > > > > > >> > > > > > > > >> > > Can you update the above sentence so that the intention is > > > > clearer ? > > > > > >> > > > > > > > >> > > Thanks > > > > > >> > > > > > > > >> > > On Fri, Dec 1, 2017 at 6:33 PM, Dong Lin < > lindong28@gmail.com > > > > > > > > wrote: > > > > > >> > > > > > > > >> > > > Hi all, > > > > > >> > > > > > > > > >> > > > I have created KIP-232: Detect outdated metadata by adding > > > > > >> > > > ControllerMetadataEpoch field: > > > > > >> > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP- > > > > > >> > > > 232%3A+Detect+outdated+metadata+by+adding+ > > > > > >> > ControllerMetadataEpoch+field > > > > > >> > > > . > > > > > >> > > > > > > > > >> > > > The KIP proposes to add fields in MetadataResponse and > > > > > >> > > > UpdateMetadataRequest so that client can reject outdated > > > > metadata > > > > > >> and > > > > > >> > > avoid > > > > > >> > > > unnecessary OffsetOutOfRangeException. Otherwise there is > > > > > currently > > > > > >> > race > > > > > >> > > > condition that can cause consumer to reset offset which > > > > negatively > > > > > >> > affect > > > > > >> > > > the consumer's availability. > > > > > >> > > > > > > > > >> > > > Feedback and suggestions are welcome! > > > > > >> > > > > > > > > >> > > > Regards, > > > > > >> > > > Dong > > > > > >> > > > > > > > > >> > > > > > > > >> > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > > > > --f403045ed7a07871530560178aa6--