Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id B3890200BF4 for ; Fri, 6 Jan 2017 19:05:39 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id AED0C160B39; Fri, 6 Jan 2017 18:05:39 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id AA9F5160B37 for ; Fri, 6 Jan 2017 19:05:38 +0100 (CET) Received: (qmail 87885 invoked by uid 500); 6 Jan 2017 18:05:37 -0000 Mailing-List: contact dev-help@kafka.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@kafka.apache.org Delivered-To: mailing list dev@kafka.apache.org Received: (qmail 87873 invoked by uid 99); 6 Jan 2017 18:05:37 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd1-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 06 Jan 2017 18:05:37 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd1-us-west.apache.org (ASF Mail Server at spamd1-us-west.apache.org) with ESMTP id ED6F1C0795 for ; Fri, 6 Jan 2017 18:05:36 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd1-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 1.779 X-Spam-Level: * X-Spam-Status: No, score=1.779 tagged_above=-999 required=6.31 tests=[DKIM_SIGNED=0.1, DKIM_VALID=-0.1, HTML_MESSAGE=2, RCVD_IN_DNSWL_LOW=-0.7, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, RCVD_IN_SORBS_SPAM=0.5, SPF_PASS=-0.001] autolearn=disabled Authentication-Results: spamd1-us-west.apache.org (amavisd-new); dkim=pass (2048-bit key) header.d=confluent-io.20150623.gappssmtp.com Received: from mx1-lw-us.apache.org ([10.40.0.8]) by localhost (spamd1-us-west.apache.org [10.40.0.7]) (amavisd-new, port 10024) with ESMTP id ORaVksOoPmRz for ; Fri, 6 Jan 2017 18:05:34 +0000 (UTC) Received: from mail-qt0-f179.google.com (mail-qt0-f179.google.com [209.85.216.179]) by mx1-lw-us.apache.org (ASF Mail Server at mx1-lw-us.apache.org) with ESMTPS id 3146F5F54F for ; Fri, 6 Jan 2017 18:05:34 +0000 (UTC) Received: by mail-qt0-f179.google.com with SMTP id v23so85577865qtb.0 for ; Fri, 06 Jan 2017 10:05:34 -0800 (PST) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=confluent-io.20150623.gappssmtp.com; s=20150623; h=mime-version:in-reply-to:references:from:date:message-id:subject:to; bh=r3P1R5cw7ugLN08dLr21j370YQgtldjs6fWwoNdmjUs=; b=bwtmy9/jnNlEl1JX+PGIXIma/0KKG7DQfQHFyFXTET67hUZfTwBzt/haqwPwE9J0Ol FjizRtslpsdDob5YFJrOI9Dz1lLHumbyHxvPxwfKOx9XOfYh0ICxcgQLA6S8RaW5vUkU 8zkV8B6hN4+j3v+v2SQGNPItJymMiJw5bDWeeWu5/06lV2a0zutH8UayvzXKluiPW3Kx XpeJlL7fv8a5ilpo+6VCrdsoJpuINmgHAki63nDgydmiJPOyDbFMRcxyz4CXmuZs5ZaU RglCJrrgKYZ9HHwbLu2+V8BUcRvYctxEq6gdPOMMHA0SLI0JUy+mectM+N4HYSdxajel eaWA== X-Google-DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=1e100.net; s=20161025; h=x-gm-message-state:mime-version:in-reply-to:references:from:date :message-id:subject:to; bh=r3P1R5cw7ugLN08dLr21j370YQgtldjs6fWwoNdmjUs=; b=EkKYaUbh0YlT1DrhawZW/DBjDs3MRpzOEpX+DU8OQJIyxpPDxkmrewwVa4fMtJj55Q LNj3EbO9hpF0VGKwMWtUMqAwpGckLHMYvo3ey3iir8pgnIBGMFN+cdWytSBrhserkS6Q 1chimoqiU3ghZo6cEdnyrMvZec/p0y3PGY3enrTijOdMFr11Z0ePQaXnIEjULCauUhTt +vpxOhQ53Kug/Vew/RKDoQXOcgmX86f8ztsSA6iLdC4Jad2GvJ2KqjPXKKrN1CGZeeDa MU2S42d5vSwZL/ng6Z2eUtqn8calLwHC4RLiVonSOoDkeaQhHzxsJzId70UpbZ5CBc2i YHcA== X-Gm-Message-State: AIkVDXLqvi+iHDl8lCPVXnXu3bO8cwYSxtJhaF7/D25d5Us+jiM4CzuY+tHMCRvmUWWOxniDTBbB7hzeu8hakzxg X-Received: by 10.200.48.206 with SMTP id w14mr6161601qta.50.1483725932069; Fri, 06 Jan 2017 10:05:32 -0800 (PST) MIME-Version: 1.0 Received: by 10.12.169.216 with HTTP; Fri, 6 Jan 2017 10:05:31 -0800 (PST) In-Reply-To: References: From: Jun Rao Date: Fri, 6 Jan 2017 10:05:31 -0800 Message-ID: Subject: Re: [DISCUSS] KIP-107: Add purgeDataBefore() API in AdminClient To: "dev@kafka.apache.org" Content-Type: multipart/alternative; boundary=001a1147b82c201af9054570dd04 archived-at: Fri, 06 Jan 2017 18:05:39 -0000 --001a1147b82c201af9054570dd04 Content-Type: text/plain; charset=UTF-8 Hi, Dong, Thanks for the proposal. Looks good overall. A couple of comments. 1. Where is the low_watermark checkpointed? Is that in replication-offset-checkpoint? If so, do we need to bump up the version? Could you also describe the format change? 2. For topics with "delete" retention, currently we let each replica delete old segments independently. With low_watermark, we could just let leaders delete old segments through the deletion policy and the followers will simply delete old segments based on low_watermark. Not sure if this saves much, but is a potential option that may be worth thinking about. Jun On Wed, Jan 4, 2017 at 8:13 AM, radai wrote: > one more example of complicated config - mirror maker. > > we definitely cant trust each and every topic owner to configure their > topics not to purge before they've been mirrored. > which would mean there's a per-topic config (set by the owner) and a > "global" config (where mirror makers are specified) and they need to be > "merged". > for those topics that _are_ mirrored. > which is a changing set of topics thats stored in an external system > outside of kafka. > if a topic is taken out of the mirror set the MM offset would be "frozen" > at that point and prevent clean-up for all eternity, unless its cleaned-up > itself. > > ... > > complexity :-) > > On Wed, Jan 4, 2017 at 8:04 AM, radai wrote: > > > in summary - i'm not opposed to the idea of a per-topic clean up config > > that tracks some set of consumer groups' offsets (which would probably > work > > for 80% of use cases), but i definitely see a need to expose a simple API > > for the more advanced/obscure/custom use cases (the other 20%). > > > > On Wed, Jan 4, 2017 at 7:54 AM, radai > wrote: > > > >> a major motivation for this KIP is cost savings. > >> > >> lots of internal systems at LI use kafka as an intermediate pipe, and > set > >> the topic retention period to a "safe enough" amount of time to be able > to > >> recover from crashes/downtime and catch up to "now". this results in a > few > >> days' worth of retention typically. > >> > >> however, under normal operating conditions the consumers are mostly > >> caught-up and so early clean-up enables a big cost savings in storage. > >> > >> as for my points: > >> > >> 1. when discussing implementation options for automatic clean-up we > >> realized that cleaning up by keeping track of offsets stored in kafka > >> requires some per-topic config - you need to specify which groups to > track. > >> this becomes a problem because: > >> 1.1 - relatively complicated code, to be written in the broker. > >> 1.2 - configuration needs to be maintained up to date by topic > >> "owners" - of which we have thousands. failure to do so would decrease > the > >> cost benefit. > >> 1.3 - some applications have a "reconsume" / "reinit" / "bootstrap" > >> workflow where they will reset their offsets to an earlier value than > the > >> one stored. this means that a stored offset of X does not always mean > you > >> can clean up to X-1. think of it as video encoding -some apps have "key > >> frames" they may seek back to which are before their current offset. > >> 1.4 - there are multiple possible strategies - you could clean up > >> aggressively, retain some "time distance" from latest, some "offset > >> distance", etc. this we think would have made it very hard to agree on a > >> single "correct" implementation that everyone would be happy with. it > would > >> be better to include the raw functionality in the API and leave the > >> "brains" to an external monitoring system where people could > custom-taylor > >> their logic > >> > >> 2. ad-hoc consumer groups: its common practice for devs to spin up > >> console consumers and connect to a topic as a debug aid. SREs may also > do > >> this. there are also various other eco-system applications that may > >> consumer from topics (unknown to topic owners as those are infra > monitoring > >> tools). obviously such consumer-groups' offsets should be ignored for > >> purposes of clean-up, but coming up with a bullet-proof way to do this > is > >> non-trivial and again ties with implementation complexity and > inflexibility > >> of a "one size fits all" solution in 1.4 above. > >> > >> 3. forceful clean-up: we have workflows that use kafka to move gigantic > >> blobs from offline hadoop processing flows into systems. the data being > >> "loaded" into such an online system can be several GBs in side and take > a > >> long time to consume (they are sliced into many small msgs). sometimes > the > >> sender wants to abort and start a new blob before the current load > process > >> has completed - meaning the consumer's offsets are not yet caught up. > >> > >> 4. offsets outside of kafka: yes, you could force applications to store > >> their offsets twice, but thats inefficient. its better to expose a raw, > >> simple API and let such applications manage their own clean-up logic > (this > >> again ties into 1.4 and no "one size fits all" solution) > >> > >> On Tue, Jan 3, 2017 at 11:49 PM, Dong Lin wrote: > >> > >>> On Tue, Jan 3, 2017 at 11:01 PM, Ewen Cheslack-Postava < > >>> ewen@confluent.io> > >>> wrote: > >>> > >>> > On Tue, Jan 3, 2017 at 6:14 PM, Dong Lin > wrote: > >>> > > >>> > > Hey Ewen, > >>> > > > >>> > > Thanks for the review. As Radai explained, it would be complex in > >>> terms > >>> > of > >>> > > user configuration if we were to use committed offset to decide > data > >>> > > deletion. We need a way to specify which groups need to consume > data > >>> of > >>> > > this partition. The broker will also need to consume the entire > >>> offsets > >>> > > topic in that approach which has some overhead. I don't think it is > >>> that > >>> > > hard to implement. But it will likely take more time to discuss > that > >>> > > approach due to the new config and the server side overhead. > >>> > > > >>> > > We choose to put this API in AdminClient because the API is more > >>> like an > >>> > > administrative operation (such as listGroups, deleteTopics) than a > >>> > consumer > >>> > > operation. It is not necessarily called by consumer only. For > >>> example, we > >>> > > can implement the "delete data before committed offset" approach by > >>> > running > >>> > > an external service which calls purgeDataBefore() API based on > >>> committed > >>> > > offset of consumer groups. > >>> > > > >>> > > I am not aware that AdminClient is not a public API. Suppose it is > >>> not > >>> > > public now, I assume we plan to make it public in the future as > part > >>> of > >>> > > KIP-4. Are we not making it public because its interface is not > >>> stable? > >>> > If > >>> > > so, can we just tag this new API as not stable in the code? > >>> > > > >>> > > >>> > > >>> > The AdminClient planned for KIP-4 is a new Java-based implementation. > >>> It's > >>> > definitely confusing that both will be (could be?) named AdminClient, > >>> but > >>> > we've kept the existing Scala AdminClient out of the public API and > >>> have > >>> > not required KIPs for changes to it. > >>> > > >>> > That said, I agree that if this type of API makes it into Kafka, > >>> having a > >>> > (new, Java-based) AdminClient method would definitely be a good idea. > >>> An > >>> > alternative path might be to have a Consumer-based implementation > since > >>> > that seems like a very intuitive, natural way to use the protocol. I > >>> think > >>> > optimizing for the expected use case would be a good idea. > >>> > > >>> > -Ewen > >>> > > >>> > Are you saying that the Scala AdminClient is not a public API and we > >>> discourage addition of any new feature to this class? > >>> > >>> I still prefer to add it to AdminClient (Java version in the future and > >>> Scala version in the short team) because I feel it belongs to admin > >>> operation instead of KafkaConsumer interface. For example, if in the > >>> future > >>> we implement the "delete data before committed offset" strategy in an > >>> external service, I feel it is a bit awkward if the service has to > >>> instantiate a KafkaConsumer and call KafkaConsumer.purgeDataBefore( > ...) > >>> to > >>> purge data. In other words, our expected use-case doesn't necessarily > >>> bind > >>> this API with consumer. > >>> > >>> I am not strong on this issue. Let's see what other > committers/developers > >>> think about this. > >>> > >>> > >>> > > >>> > > > >>> > > Thanks, > >>> > > Dong > >>> > > > >>> > > On Tue, Jan 3, 2017 at 3:56 PM, Ewen Cheslack-Postava < > >>> ewen@confluent.io > >>> > > > >>> > > wrote: > >>> > > > >>> > > > Dong, > >>> > > > > >>> > > > Looks like that's an internal link, > >>> > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-107% > >>> > > > 3A+Add+purgeDataBefore%28%29+API+in+AdminClient > >>> > > > is the right one. > >>> > > > > >>> > > > I have a question about one of the rejected alternatives: > >>> > > > > >>> > > > > Using committed offset instead of an extra API to trigger data > >>> purge > >>> > > > operation. > >>> > > > > >>> > > > The KIP says this would be more complicated to implement. Why is > >>> that? > >>> > I > >>> > > > think brokers would have to consume the entire offsets topic, but > >>> the > >>> > > data > >>> > > > stored in memory doesn't seem to change and applying this when > >>> updated > >>> > > > offsets are seen seems basically the same. It might also be > >>> possible to > >>> > > > make it work even with multiple consumer groups if that was > desired > >>> > > > (although that'd require tracking more data in memory) as a > >>> > > generalization > >>> > > > without requiring coordination between the consumer groups. Given > >>> the > >>> > > > motivation, I'm assuming this was considered unnecessary since > this > >>> > > > specifically targets intermediate stream processing topics. > >>> > > > > >>> > > > Another question is why expose this via AdminClient (which isn't > >>> public > >>> > > API > >>> > > > afaik)? Why not, for example, expose it on the Consumer, which is > >>> > > > presumably where you'd want access to it since the functionality > >>> > depends > >>> > > on > >>> > > > the consumer actually having consumed the data? > >>> > > > > >>> > > > -Ewen > >>> > > > > >>> > > > On Tue, Jan 3, 2017 at 2:45 PM, Dong Lin > >>> wrote: > >>> > > > > >>> > > > > Hi all, > >>> > > > > > >>> > > > > We created KIP-107 to propose addition of purgeDataBefore() API > >>> in > >>> > > > > AdminClient. > >>> > > > > > >>> > > > > Please find the KIP wiki in the link > https://iwww.corp.linkedin. > >>> > > > > com/wiki/cf/display/ENGS/Kafka+purgeDataBefore%28%29+API+ > >>> > > > design+proposal. > >>> > > > > We > >>> > > > > would love to hear your comments and suggestions. > >>> > > > > > >>> > > > > Thanks, > >>> > > > > Dong > >>> > > > > > >>> > > > > >>> > > > >>> > > >>> > >> > >> > > > --001a1147b82c201af9054570dd04--