Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id A2A1D200BEF for ; Wed, 4 Jan 2017 17:05:19 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id A101D160B3A; Wed, 4 Jan 2017 16:05:19 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 9FC36160B39 for ; Wed, 4 Jan 2017 17:05:18 +0100 (CET) Received: (qmail 55759 invoked by uid 500); 4 Jan 2017 16:05:17 -0000 Mailing-List: contact dev-help@kafka.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@kafka.apache.org Delivered-To: mailing list dev@kafka.apache.org Received: (qmail 55742 invoked by uid 99); 4 Jan 2017 16:05:17 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd2-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 04 Jan 2017 16:05:17 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd2-us-west.apache.org (ASF Mail Server at spamd2-us-west.apache.org) with ESMTP id ECB2B1A0885 for ; Wed, 4 Jan 2017 16:05:16 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd2-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 1.68 X-Spam-Level: * X-Spam-Status: No, score=1.68 tagged_above=-999 required=6.31 tests=[AC_DIV_BONANZA=0.001, DKIM_SIGNED=0.1, DKIM_VALID=-0.1, DKIM_VALID_AU=-0.1, HTML_MESSAGE=2, RCVD_IN_DNSWL_LOW=-0.7, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, RCVD_IN_SORBS_SPAM=0.5, SPF_PASS=-0.001] autolearn=disabled Authentication-Results: spamd2-us-west.apache.org (amavisd-new); dkim=pass (2048-bit key) header.d=gmail.com Received: from mx1-lw-eu.apache.org ([10.40.0.8]) by localhost (spamd2-us-west.apache.org [10.40.0.9]) (amavisd-new, port 10024) with ESMTP id 6_A6xokhRm5r for ; Wed, 4 Jan 2017 16:05:13 +0000 (UTC) Received: from mail-qt0-f172.google.com (mail-qt0-f172.google.com [209.85.216.172]) by mx1-lw-eu.apache.org (ASF Mail Server at mx1-lw-eu.apache.org) with ESMTPS id AEBB65F640 for ; Wed, 4 Jan 2017 16:05:12 +0000 (UTC) Received: by mail-qt0-f172.google.com with SMTP id c47so494400184qtc.2 for ; Wed, 04 Jan 2017 08:05:12 -0800 (PST) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=gmail.com; s=20161025; h=mime-version:in-reply-to:references:from:date:message-id:subject:to; bh=nyFDAUt31VRm+SOGJxgWlu03LHkDGTu9YNHqkrzWwq0=; b=pgCAzpKTIO7tc9hPNEq3Spuejma4fORcAxjF8ljdWscfWSCJSBCz4cjfWQcodaZex8 GHJMLqTABIoqILsOci8wBfLQCbJ15u6akIt+N3i0W3Qaxfb5TQJgOXwYU8XEEQ/LLa4v oeO6MgIoYROHzP6+JXahStHWWwbX4q3UeiteNyXRyN8WaY7PcOEib8iSA9gyZ2NWxCd+ s2+721n5zSlZ6qqwaoWy4ODKH8C8m7qP4Cxcwyqib3/jUQEXg/8WYvyaSfemGv97C7BV 2/Y146wdt0A8SEdOWNEmork1HJd7TBP3YUKs51t62f3cuepFbyYFniHRnAUn1tz94AsM 7ksw== X-Google-DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=1e100.net; s=20161025; h=x-gm-message-state:mime-version:in-reply-to:references:from:date :message-id:subject:to; bh=nyFDAUt31VRm+SOGJxgWlu03LHkDGTu9YNHqkrzWwq0=; b=Gu3Al1Amnrbka51PSnhz4w8RwN5p5pXsd8siTaied9hWUL5YBPj3Mc3ZnHrfSKmZrl Jrf4JDDrKSp9Xwdbrfatqh4kj/qCcsse6BpsUxsZuTgge4L30IAvjbwTNLvbMuqTfkV4 76YLG3Im2+E5jUDWrW5z+A2GezEGLvdS2mLBp5T0/0ciXD+PpTCWKEwp/qF/iarSUbw0 ZTE4u7KjQ+K8fGr8GVgSjm4kp/VMegFD9o1h6eWiacbho/cjc7AnbZeJqy2gFFXlVoV1 d6UKhTWkUqxo/LtQjbNotSDnpGxcfB+baTwP2iQOBM1psNfaUUbmo3NtklwQ0A2eok+f Ur+g== X-Gm-Message-State: AIkVDXJRZ9oIo1UsNSL22dWBI3qxmc7//151GtLt6+STBLwKBPNDZ3BmKMgyHjvyfTgTpsfY0jsYyrWIpWL52Q== X-Received: by 10.237.36.90 with SMTP id s26mr62808449qtc.114.1483545905596; Wed, 04 Jan 2017 08:05:05 -0800 (PST) MIME-Version: 1.0 Received: by 10.12.176.73 with HTTP; Wed, 4 Jan 2017 08:04:45 -0800 (PST) In-Reply-To: References: From: radai Date: Wed, 4 Jan 2017 08:04:45 -0800 Message-ID: Subject: Re: [DISCUSS] KIP-107: Add purgeDataBefore() API in AdminClient To: dev@kafka.apache.org Content-Type: multipart/alternative; boundary=001a11408276b611e6054546f2d5 archived-at: Wed, 04 Jan 2017 16:05:19 -0000 --001a11408276b611e6054546f2d5 Content-Type: text/plain; charset=UTF-8 in summary - i'm not opposed to the idea of a per-topic clean up config that tracks some set of consumer groups' offsets (which would probably work for 80% of use cases), but i definitely see a need to expose a simple API for the more advanced/obscure/custom use cases (the other 20%). On Wed, Jan 4, 2017 at 7:54 AM, radai wrote: > a major motivation for this KIP is cost savings. > > lots of internal systems at LI use kafka as an intermediate pipe, and set > the topic retention period to a "safe enough" amount of time to be able to > recover from crashes/downtime and catch up to "now". this results in a few > days' worth of retention typically. > > however, under normal operating conditions the consumers are mostly > caught-up and so early clean-up enables a big cost savings in storage. > > as for my points: > > 1. when discussing implementation options for automatic clean-up we > realized that cleaning up by keeping track of offsets stored in kafka > requires some per-topic config - you need to specify which groups to track. > this becomes a problem because: > 1.1 - relatively complicated code, to be written in the broker. > 1.2 - configuration needs to be maintained up to date by topic > "owners" - of which we have thousands. failure to do so would decrease the > cost benefit. > 1.3 - some applications have a "reconsume" / "reinit" / "bootstrap" > workflow where they will reset their offsets to an earlier value than the > one stored. this means that a stored offset of X does not always mean you > can clean up to X-1. think of it as video encoding -some apps have "key > frames" they may seek back to which are before their current offset. > 1.4 - there are multiple possible strategies - you could clean up > aggressively, retain some "time distance" from latest, some "offset > distance", etc. this we think would have made it very hard to agree on a > single "correct" implementation that everyone would be happy with. it would > be better to include the raw functionality in the API and leave the > "brains" to an external monitoring system where people could custom-taylor > their logic > > 2. ad-hoc consumer groups: its common practice for devs to spin up console > consumers and connect to a topic as a debug aid. SREs may also do this. > there are also various other eco-system applications that may consumer from > topics (unknown to topic owners as those are infra monitoring tools). > obviously such consumer-groups' offsets should be ignored for purposes of > clean-up, but coming up with a bullet-proof way to do this is non-trivial > and again ties with implementation complexity and inflexibility of a "one > size fits all" solution in 1.4 above. > > 3. forceful clean-up: we have workflows that use kafka to move gigantic > blobs from offline hadoop processing flows into systems. the data being > "loaded" into such an online system can be several GBs in side and take a > long time to consume (they are sliced into many small msgs). sometimes the > sender wants to abort and start a new blob before the current load process > has completed - meaning the consumer's offsets are not yet caught up. > > 4. offsets outside of kafka: yes, you could force applications to store > their offsets twice, but thats inefficient. its better to expose a raw, > simple API and let such applications manage their own clean-up logic (this > again ties into 1.4 and no "one size fits all" solution) > > On Tue, Jan 3, 2017 at 11:49 PM, Dong Lin wrote: > >> On Tue, Jan 3, 2017 at 11:01 PM, Ewen Cheslack-Postava > > >> wrote: >> >> > On Tue, Jan 3, 2017 at 6:14 PM, Dong Lin wrote: >> > >> > > Hey Ewen, >> > > >> > > Thanks for the review. As Radai explained, it would be complex in >> terms >> > of >> > > user configuration if we were to use committed offset to decide data >> > > deletion. We need a way to specify which groups need to consume data >> of >> > > this partition. The broker will also need to consume the entire >> offsets >> > > topic in that approach which has some overhead. I don't think it is >> that >> > > hard to implement. But it will likely take more time to discuss that >> > > approach due to the new config and the server side overhead. >> > > >> > > We choose to put this API in AdminClient because the API is more like >> an >> > > administrative operation (such as listGroups, deleteTopics) than a >> > consumer >> > > operation. It is not necessarily called by consumer only. For >> example, we >> > > can implement the "delete data before committed offset" approach by >> > running >> > > an external service which calls purgeDataBefore() API based on >> committed >> > > offset of consumer groups. >> > > >> > > I am not aware that AdminClient is not a public API. Suppose it is not >> > > public now, I assume we plan to make it public in the future as part >> of >> > > KIP-4. Are we not making it public because its interface is not >> stable? >> > If >> > > so, can we just tag this new API as not stable in the code? >> > > >> > >> > >> > The AdminClient planned for KIP-4 is a new Java-based implementation. >> It's >> > definitely confusing that both will be (could be?) named AdminClient, >> but >> > we've kept the existing Scala AdminClient out of the public API and have >> > not required KIPs for changes to it. >> > >> > That said, I agree that if this type of API makes it into Kafka, having >> a >> > (new, Java-based) AdminClient method would definitely be a good idea. An >> > alternative path might be to have a Consumer-based implementation since >> > that seems like a very intuitive, natural way to use the protocol. I >> think >> > optimizing for the expected use case would be a good idea. >> > >> > -Ewen >> > >> > Are you saying that the Scala AdminClient is not a public API and we >> discourage addition of any new feature to this class? >> >> I still prefer to add it to AdminClient (Java version in the future and >> Scala version in the short team) because I feel it belongs to admin >> operation instead of KafkaConsumer interface. For example, if in the >> future >> we implement the "delete data before committed offset" strategy in an >> external service, I feel it is a bit awkward if the service has to >> instantiate a KafkaConsumer and call KafkaConsumer.purgeDataBefore(...) >> to >> purge data. In other words, our expected use-case doesn't necessarily bind >> this API with consumer. >> >> I am not strong on this issue. Let's see what other committers/developers >> think about this. >> >> >> > >> > > >> > > Thanks, >> > > Dong >> > > >> > > On Tue, Jan 3, 2017 at 3:56 PM, Ewen Cheslack-Postava < >> ewen@confluent.io >> > > >> > > wrote: >> > > >> > > > Dong, >> > > > >> > > > Looks like that's an internal link, >> > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-107% >> > > > 3A+Add+purgeDataBefore%28%29+API+in+AdminClient >> > > > is the right one. >> > > > >> > > > I have a question about one of the rejected alternatives: >> > > > >> > > > > Using committed offset instead of an extra API to trigger data >> purge >> > > > operation. >> > > > >> > > > The KIP says this would be more complicated to implement. Why is >> that? >> > I >> > > > think brokers would have to consume the entire offsets topic, but >> the >> > > data >> > > > stored in memory doesn't seem to change and applying this when >> updated >> > > > offsets are seen seems basically the same. It might also be >> possible to >> > > > make it work even with multiple consumer groups if that was desired >> > > > (although that'd require tracking more data in memory) as a >> > > generalization >> > > > without requiring coordination between the consumer groups. Given >> the >> > > > motivation, I'm assuming this was considered unnecessary since this >> > > > specifically targets intermediate stream processing topics. >> > > > >> > > > Another question is why expose this via AdminClient (which isn't >> public >> > > API >> > > > afaik)? Why not, for example, expose it on the Consumer, which is >> > > > presumably where you'd want access to it since the functionality >> > depends >> > > on >> > > > the consumer actually having consumed the data? >> > > > >> > > > -Ewen >> > > > >> > > > On Tue, Jan 3, 2017 at 2:45 PM, Dong Lin >> wrote: >> > > > >> > > > > Hi all, >> > > > > >> > > > > We created KIP-107 to propose addition of purgeDataBefore() API in >> > > > > AdminClient. >> > > > > >> > > > > Please find the KIP wiki in the link https://iwww.corp.linkedin. >> > > > > com/wiki/cf/display/ENGS/Kafka+purgeDataBefore%28%29+API+ >> > > > design+proposal. >> > > > > We >> > > > > would love to hear your comments and suggestions. >> > > > > >> > > > > Thanks, >> > > > > Dong >> > > > > >> > > > >> > > >> > >> > > --001a11408276b611e6054546f2d5--