From dev-return-90679-archive-asf-public=cust-asf.ponee.io@kafka.apache.org Fri Jan 5 01:03:05 2018 Return-Path: X-Original-To: archive-asf-public@eu.ponee.io Delivered-To: archive-asf-public@eu.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by mx-eu-01.ponee.io (Postfix) with ESMTP id 21141180657 for ; Fri, 5 Jan 2018 01:03:05 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id 10E6C160C3A; Fri, 5 Jan 2018 00:03:05 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 06D56160C2B for ; Fri, 5 Jan 2018 01:03:03 +0100 (CET) Received: (qmail 42945 invoked by uid 500); 5 Jan 2018 00:03:02 -0000 Mailing-List: contact dev-help@kafka.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@kafka.apache.org Delivered-To: mailing list dev@kafka.apache.org Received: (qmail 42930 invoked by uid 99); 5 Jan 2018 00:03:02 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd3-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 05 Jan 2018 00:03:02 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd3-us-west.apache.org (ASF Mail Server at spamd3-us-west.apache.org) with ESMTP id EED2218077D for ; Fri, 5 Jan 2018 00:03:01 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd3-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 1.979 X-Spam-Level: * X-Spam-Status: No, score=1.979 tagged_above=-999 required=6.31 tests=[DKIM_SIGNED=0.1, DKIM_VALID=-0.1, HTML_MESSAGE=2, RCVD_IN_DNSWL_NONE=-0.0001, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, SPF_PASS=-0.001] autolearn=disabled Authentication-Results: spamd3-us-west.apache.org (amavisd-new); dkim=pass (2048-bit key) header.d=confluent-io.20150623.gappssmtp.com Received: from mx1-lw-us.apache.org ([10.40.0.8]) by localhost (spamd3-us-west.apache.org [10.40.0.10]) (amavisd-new, port 10024) with ESMTP id NSNkfhjRqWru for ; Fri, 5 Jan 2018 00:02:58 +0000 (UTC) Received: from mail-qt0-f177.google.com (mail-qt0-f177.google.com [209.85.216.177]) by mx1-lw-us.apache.org (ASF Mail Server at mx1-lw-us.apache.org) with ESMTPS id CA99E5F21E for ; Fri, 5 Jan 2018 00:02:57 +0000 (UTC) Received: by mail-qt0-f177.google.com with SMTP id m59so3936376qte.11 for ; Thu, 04 Jan 2018 16:02:57 -0800 (PST) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=confluent-io.20150623.gappssmtp.com; s=20150623; h=mime-version:in-reply-to:references:from:date:message-id:subject:to; bh=oOE/fofXHb43zy3IUf1Mt5G1zBcn8SC4nFFr9dOQQIs=; b=1fq0+geOjekU0aau8elmNJF4YXIy5mVTw6vdooJ8AZlTrIFzpQbCUdIoXw1vOFE6M2 V2dIiQyEaYLz4aiQ4NJURdaWbSK1kab3tz4oaUCZRoJFI0jv9Y8+t3qTo7RQUsCmywr9 SAmF3Vv62oqyPyncuoVIbnJC6LuoIeaYmpJxeKM4wQXVgzwVPvXiX8te1EMs3yN4pY36 JP8eXvTgw4flWZVvzTdpn4yxRax9GctGszraObaJH5nkohbpEVhHOwQu1Ohxb2U/8a7E SEezL7xmK2m6V4/s1RaWKYgfFD0ShRDaOC+FsT+InvD7g7JxUTDbDDf1dbCY//u+Bfum vKUQ== X-Google-DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=1e100.net; s=20161025; h=x-gm-message-state:mime-version:in-reply-to:references:from:date :message-id:subject:to; bh=oOE/fofXHb43zy3IUf1Mt5G1zBcn8SC4nFFr9dOQQIs=; b=Ozp1F3Uu2yWhvxRRZPH/P9fp/2UpqxZZW5Zut1xWEwzYn4yj9tNzsit2pEjwkSX4dB KXsHoxvFLypfXFlov9lewRk/gfKu1LA8GjYtrhUzUlCVyD52Lx2Y/oBXzALuwXfoVbci VtfU1uEzvRh1D5epUpGo4NFdOitJCMT98WU03ZInXFwBbKhP+wdZwcFiMCRyNRej5RbE +ju/3e2rmIPYudyqlU5WuydVNBg9CR3UQQ3NMROuri/Vf9ulqkmtNMJGdsSViGwKztMV qP0E5tMvA3kjg6ObEkjjLewqjHZNKwoRcQCIaJekr4VehV1G6V4036OXqCw3XfGAtZf+ cNAw== X-Gm-Message-State: AKwxytdTjogPngLEpjgO1kQLLUWeNGR3Z5GoUwRAutL3F2ukYSGpMEka /C9W3u6oGQDTS1MKwmaFzvHA+hKmBpsLdVfosv5SpZvR/c8= X-Google-Smtp-Source: ACJfBosCuAULbO8jEiiD539dOhzDQBYgYuA9YAHMBjUfdnEHoQnNx6SQGRAcKL1jLKI59Vsa906Kds5hDPUYVELPGkk= X-Received: by 10.200.36.110 with SMTP id d43mr1913927qtd.288.1515110570957; Thu, 04 Jan 2018 16:02:50 -0800 (PST) MIME-Version: 1.0 Received: by 10.12.146.151 with HTTP; Thu, 4 Jan 2018 16:02:49 -0800 (PST) In-Reply-To: References: From: Jun Rao Date: Thu, 4 Jan 2018 16:02:49 -0800 Message-ID: Subject: Re: [DISCUSS] KIP-232: Detect outdated metadata by adding ControllerMetadataEpoch field To: dev@kafka.apache.org Content-Type: multipart/alternative; boundary="001a113ab88c60b3850561fc2b77" --001a113ab88c60b3850561fc2b77 Content-Type: text/plain; charset="UTF-8" Hi, Dong, Got it. The api that you proposed works. The question is whether that's the api that we want to have in the long term. My concern is that while the api change is simple, the new api seems harder to explain and use. For example, a consumer storing offsets externally now needs to call waitForMetadataUpdate() after calling seek(). An alternative approach is to make the following compatible api changes in Consumer. * Add an additional OffsetEpoch field in OffsetAndMetadata. (no need to change the CommitSync() api) * Add a new api seek(TopicPartition partition, long offset, OffsetEpoch offsetEpoch). We can potentially deprecate the old api seek(TopicPartition partition, long offset) in the future. The alternative approach has similar amount of api changes as yours but has the following benefits. 1. The api works in a similar way as how offset management works now and is probably what we want in the long term. 2. It can reset offsets better when there is data loss due to unclean leader election or correlated replica failure. 3. It can reset offsets better when topic is recreated. Thanks, Jun On Thu, Jan 4, 2018 at 2:05 PM, Dong Lin wrote: > Hey Jun, > > Yeah I agree that ideally we don't want an ever growing global metadata > version. I just think it may be more desirable to keep the consumer API > simple. > > In my current proposal, metadata version returned in the fetch response > will be stored with the offset together. More specifically, the > metadata_epoch in the new offset topic schema will be the largest > metadata_epoch from all the MetadataResponse and FetchResponse ever > received by this consumer. > > We probably don't have to change the consumer API for > commitSync(Map). If user calls > commitSync(...) to commit offset 10 for a given partition, for most > use-cases, this consumer instance should have consumed message with offset > 9 from this partition, in which case the consumer can remember and use the > metadata_epoch from the corresponding FetchResponse when committing offset. > If user calls commitSync(..) to commit offset 10 for a given partition > without having consumed the message with offset 9 using this consumer > instance, this is probably an advanced use-case. In this case the advanced > user can retrieve the metadata_epoch using the newly added metadataEpoch() > API after it fetches the message with offset 9 (probably from another > consumer instance) and encode this metadata_epoch in the > string OffsetAndMetadata.metadata. Do you think this solution would work? > > By "not sure that I fully understand your latest suggestion", are you > referring to solution related to unclean leader election using leader_epoch > in my previous email? > > Thanks, > Dong > > On Thu, Jan 4, 2018 at 1:33 PM, Jun Rao wrote: > > > Hi, Dong, > > > > Not sure that I fully understand your latest suggestion. Returning an > ever > > growing global metadata version itself is no ideal, but is fine. My > > question is whether the metadata version returned in the fetch response > > needs to be stored with the offset together if offsets are stored > > externally. If so, we also have to change the consumer API for > commitSync() > > and need to worry about compatibility. If we don't store the metadata > > version together with the offset, on a consumer restart, it's not clear > how > > we can ensure the metadata in the consumer is high enough since there is > no > > metadata version to compare with. > > > > Thanks, > > > > Jun > > > > > > On Wed, Jan 3, 2018 at 6:43 PM, Dong Lin wrote: > > > > > Hey Jun, > > > > > > Thanks much for the explanation. > > > > > > I understand the advantage of partition_epoch over metadata_epoch. My > > > current concern is that the use of leader_epoch and the partition_epoch > > > requires us considerable change to consumer's public API to take care > of > > > the case where user stores offset externally. For example, *consumer*. > > > *commitSync*(..) would have to take a map whose value is > metadata, > > > leader epoch, partition epoch>. *consumer*.*seek*(...) would also need > > > leader_epoch and partition_epoch as parameter. Technically we can > > probably > > > still make it work in a backward compatible manner after careful design > > and > > > discussion. But these changes can make the consumer's interface > > > unnecessarily complex for more users who do not store offset > externally. > > > > > > After thinking more about it, we can address all problems discussed by > > only > > > using the metadata_epoch without introducing leader_epoch or the > > > partition_epoch. The current KIP describes the changes to the consumer > > API > > > and how the new API can be used if user stores offset externally. In > > order > > > to address the scenario you described earlier, we can include > > > metadata_epoch in the FetchResponse and the LeaderAndIsrRequest. > Consumer > > > remembers the largest metadata_epoch from all the FetchResponse it has > > > received. The metadata_epoch committed with the offset, either within > or > > > outside Kafka, should be the largest metadata_epoch across all > > > FetchResponse and MetadataResponse ever received by this consumer. > > > > > > The drawback of using only the metadata_epoch is that we can not always > > do > > > the smart offset reset in case of unclean leader election which you > > > mentioned earlier. But in most case, unclean leader election probably > > > happens when consumer is not rebalancing/restarting. In these cases, > > either > > > consumer is not directly affected by unclean leader election since it > is > > > not consuming from the end of the log, or consumer can derive the > > > leader_epoch from the most recent message received before it sees > > > OffsetOutOfRangeException. So I am not sure it is worth adding the > > > leader_epoch to consumer API to address the remaining corner case. What > > do > > > you think? > > > > > > Thanks, > > > Dong > > > > > > > > > > > > On Tue, Jan 2, 2018 at 6:28 PM, Jun Rao wrote: > > > > > > > Hi, Dong, > > > > > > > > Thanks for the reply. > > > > > > > > To solve the topic recreation issue, we could use either a global > > > metadata > > > > version or a partition level epoch. But either one will be a new > > concept, > > > > right? To me, the latter seems more natural. It also makes it easier > to > > > > detect if a consumer's offset is still valid after a topic is > > recreated. > > > As > > > > you pointed out, we don't need to store the partition epoch in the > > > message. > > > > The following is what I am thinking. When a partition is created, we > > can > > > > assign a partition epoch from an ever-increasing global counter and > > store > > > > it in /brokers/topics/[topic]/partitions/[partitionId] in ZK. The > > > > partition > > > > epoch is propagated to every broker. The consumer will be tracking a > > > tuple > > > > of for offsets. If a topic is > > > > recreated, it's possible that a consumer's offset and leader epoch > > still > > > > match that in the broker, but partition epoch won't be. In this case, > > we > > > > can potentially still treat the consumer's offset as out of range and > > > reset > > > > the offset based on the offset reset policy in the consumer. This > seems > > > > harder to do with a global metadata version. > > > > > > > > Jun > > > > > > > > > > > > > > > > On Mon, Dec 25, 2017 at 6:56 AM, Dong Lin > wrote: > > > > > > > > > Hey Jun, > > > > > > > > > > This is a very good example. After thinking through this in > detail, I > > > > agree > > > > > that we need to commit offset with leader epoch in order to address > > > this > > > > > example. > > > > > > > > > > I think the remaining question is how to address the scenario that > > the > > > > > topic is deleted and re-created. One possible solution is to commit > > > > offset > > > > > with both the leader epoch and the metadata version. The logic and > > the > > > > > implementation of this solution does not require a new concept > (e.g. > > > > > partition epoch) and it does not require any change to the message > > > format > > > > > or leader epoch. It also allows us to order the metadata in a > > > > > straightforward manner which may be useful in the future. So it may > > be > > > a > > > > > better solution than generating a random partition epoch every time > > we > > > > > create a partition. Does this sound reasonable? > > > > > > > > > > Previously one concern with using the metadata version is that > > consumer > > > > > will be forced to refresh metadata even if metadata version is > > > increased > > > > > due to topics that the consumer is not interested in. Now I > realized > > > that > > > > > this is probably not a problem. Currently client will refresh > > metadata > > > > > either due to InvalidMetadataException in the response from broker > or > > > due > > > > > to metadata expiry. The addition of the metadata version should > > > increase > > > > > the overhead of metadata refresh caused by > InvalidMetadataException. > > If > > > > > client refresh metadata due to expiry and it receives a metadata > > whose > > > > > version is lower than the current metadata version, we can reject > the > > > > > metadata but still reset the metadata age, which essentially keep > the > > > > > existing behavior in the client. > > > > > > > > > > Thanks much, > > > > > Dong > > > > > > > > > > > > > > > --001a113ab88c60b3850561fc2b77--