Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 122A4200BEF for ; Wed, 4 Jan 2017 16:54:54 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id 10B43160B39; Wed, 4 Jan 2017 15:54:54 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 32CF4160B3A for ; Wed, 4 Jan 2017 16:54:53 +0100 (CET) Received: (qmail 11601 invoked by uid 500); 4 Jan 2017 15:54:52 -0000 Mailing-List: contact dev-help@kafka.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@kafka.apache.org Delivered-To: mailing list dev@kafka.apache.org Received: (qmail 11571 invoked by uid 99); 4 Jan 2017 15:54:51 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd3-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 04 Jan 2017 15:54:51 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd3-us-west.apache.org (ASF Mail Server at spamd3-us-west.apache.org) with ESMTP id 6DAB01804AE for ; Wed, 4 Jan 2017 15:54:51 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd3-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 1.68 X-Spam-Level: * X-Spam-Status: No, score=1.68 tagged_above=-999 required=6.31 tests=[AC_DIV_BONANZA=0.001, DKIM_SIGNED=0.1, DKIM_VALID=-0.1, DKIM_VALID_AU=-0.1, HTML_MESSAGE=2, RCVD_IN_DNSWL_LOW=-0.7, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, RCVD_IN_SORBS_SPAM=0.5, SPF_PASS=-0.001] autolearn=disabled Authentication-Results: spamd3-us-west.apache.org (amavisd-new); dkim=pass (2048-bit key) header.d=gmail.com Received: from mx1-lw-eu.apache.org ([10.40.0.8]) by localhost (spamd3-us-west.apache.org [10.40.0.10]) (amavisd-new, port 10024) with ESMTP id 88fxTgbFn9wD for ; Wed, 4 Jan 2017 15:54:48 +0000 (UTC) Received: from mail-qt0-f182.google.com (mail-qt0-f182.google.com [209.85.216.182]) by mx1-lw-eu.apache.org (ASF Mail Server at mx1-lw-eu.apache.org) with ESMTPS id 34E1F5F24E for ; Wed, 4 Jan 2017 15:54:47 +0000 (UTC) Received: by mail-qt0-f182.google.com with SMTP id d45so271993021qta.1 for ; Wed, 04 Jan 2017 07:54:47 -0800 (PST) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=gmail.com; s=20161025; h=mime-version:in-reply-to:references:from:date:message-id:subject:to; bh=eGdbX/1XT3dBNCvW8mEJ3aPnJj5hwKEgWEsDI0C9JA4=; b=F31r10qvrOIx2/eH+Pb0Re42Gc/SA93/hNPnwh/8aGrIPCtuD52FxV16hEr4wmpx9K NcYmIs2SkuSo5pkh1Y0Tg1GmK/fsIBsdJLCPEGnEt+d71c1i69LD77fKiPYwBOjEuGCM bYLnPJGnZqQdmYBeJY05FOZeePrLyMGVm5IijdzYKlwlMxVsrUr5r6zyaDOBZVICmwbY xy5hoMFJn0QtBdv1G35wxhgbygrsEUNfJKYW7GSu3L71y5m606f4BEBcVmKgy3vPURI3 xmNVQwN0AoeGDlo0afo1e9idE1FskZFyoV8ynm/XtpvKHvi73NoKvFeVhXb8wxglsvhr JLvg== X-Google-DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=1e100.net; s=20161025; h=x-gm-message-state:mime-version:in-reply-to:references:from:date :message-id:subject:to; bh=eGdbX/1XT3dBNCvW8mEJ3aPnJj5hwKEgWEsDI0C9JA4=; b=H900WU0sxxPRAtJUvS14vwWUk02PGZT9NPTBxJSYhbsUGeHyNbrv8nq/GV4atIA2ne s/7BbxXMywfCstCXGCRc4eZd+vjZAtxOROZ+xwgpyhcKb5MjhxFrip4zTSrkk1QFldgU z8MBSpdEVLyVop8g3mVRMFTE8qNDsALeWwTM877KtsD2LmsXutkarUbsI6vbdsUgQrCN 2ccOZymh+JoEhV8a2B3e3cXpMz0CFmiayuz4+4j1wbzQL/lCDcwDLTtn7Srrp7cnHf1v xxiJhGcB9zRfVecwT+1QH3RPCEu2enHxqJmE/wTYLChh9A7CdqnvQScM3Obu/jFcpNiV 95mQ== X-Gm-Message-State: AIkVDXIzYJ1T4ACzQ3vXF203AKUqHtFpnrQP50VC83ihphJIvKH2StXJc/MilNAaYiOUBFMboE1+LBXVPaJ+0g== X-Received: by 10.200.42.253 with SMTP id c58mr57959333qta.145.1483545282516; Wed, 04 Jan 2017 07:54:42 -0800 (PST) MIME-Version: 1.0 Received: by 10.12.176.73 with HTTP; Wed, 4 Jan 2017 07:54:22 -0800 (PST) In-Reply-To: References: From: radai Date: Wed, 4 Jan 2017 07:54:22 -0800 Message-ID: Subject: Re: [DISCUSS] KIP-107: Add purgeDataBefore() API in AdminClient To: dev@kafka.apache.org Content-Type: multipart/alternative; boundary=001a113fdc50929bf7054546cd67 archived-at: Wed, 04 Jan 2017 15:54:54 -0000 --001a113fdc50929bf7054546cd67 Content-Type: text/plain; charset=UTF-8 a major motivation for this KIP is cost savings. lots of internal systems at LI use kafka as an intermediate pipe, and set the topic retention period to a "safe enough" amount of time to be able to recover from crashes/downtime and catch up to "now". this results in a few days' worth of retention typically. however, under normal operating conditions the consumers are mostly caught-up and so early clean-up enables a big cost savings in storage. as for my points: 1. when discussing implementation options for automatic clean-up we realized that cleaning up by keeping track of offsets stored in kafka requires some per-topic config - you need to specify which groups to track. this becomes a problem because: 1.1 - relatively complicated code, to be written in the broker. 1.2 - configuration needs to be maintained up to date by topic "owners" - of which we have thousands. failure to do so would decrease the cost benefit. 1.3 - some applications have a "reconsume" / "reinit" / "bootstrap" workflow where they will reset their offsets to an earlier value than the one stored. this means that a stored offset of X does not always mean you can clean up to X-1. think of it as video encoding -some apps have "key frames" they may seek back to which are before their current offset. 1.4 - there are multiple possible strategies - you could clean up aggressively, retain some "time distance" from latest, some "offset distance", etc. this we think would have made it very hard to agree on a single "correct" implementation that everyone would be happy with. it would be better to include the raw functionality in the API and leave the "brains" to an external monitoring system where people could custom-taylor their logic 2. ad-hoc consumer groups: its common practice for devs to spin up console consumers and connect to a topic as a debug aid. SREs may also do this. there are also various other eco-system applications that may consumer from topics (unknown to topic owners as those are infra monitoring tools). obviously such consumer-groups' offsets should be ignored for purposes of clean-up, but coming up with a bullet-proof way to do this is non-trivial and again ties with implementation complexity and inflexibility of a "one size fits all" solution in 1.4 above. 3. forceful clean-up: we have workflows that use kafka to move gigantic blobs from offline hadoop processing flows into systems. the data being "loaded" into such an online system can be several GBs in side and take a long time to consume (they are sliced into many small msgs). sometimes the sender wants to abort and start a new blob before the current load process has completed - meaning the consumer's offsets are not yet caught up. 4. offsets outside of kafka: yes, you could force applications to store their offsets twice, but thats inefficient. its better to expose a raw, simple API and let such applications manage their own clean-up logic (this again ties into 1.4 and no "one size fits all" solution) On Tue, Jan 3, 2017 at 11:49 PM, Dong Lin wrote: > On Tue, Jan 3, 2017 at 11:01 PM, Ewen Cheslack-Postava > wrote: > > > On Tue, Jan 3, 2017 at 6:14 PM, Dong Lin wrote: > > > > > Hey Ewen, > > > > > > Thanks for the review. As Radai explained, it would be complex in terms > > of > > > user configuration if we were to use committed offset to decide data > > > deletion. We need a way to specify which groups need to consume data of > > > this partition. The broker will also need to consume the entire offsets > > > topic in that approach which has some overhead. I don't think it is > that > > > hard to implement. But it will likely take more time to discuss that > > > approach due to the new config and the server side overhead. > > > > > > We choose to put this API in AdminClient because the API is more like > an > > > administrative operation (such as listGroups, deleteTopics) than a > > consumer > > > operation. It is not necessarily called by consumer only. For example, > we > > > can implement the "delete data before committed offset" approach by > > running > > > an external service which calls purgeDataBefore() API based on > committed > > > offset of consumer groups. > > > > > > I am not aware that AdminClient is not a public API. Suppose it is not > > > public now, I assume we plan to make it public in the future as part of > > > KIP-4. Are we not making it public because its interface is not stable? > > If > > > so, can we just tag this new API as not stable in the code? > > > > > > > > > The AdminClient planned for KIP-4 is a new Java-based implementation. > It's > > definitely confusing that both will be (could be?) named AdminClient, but > > we've kept the existing Scala AdminClient out of the public API and have > > not required KIPs for changes to it. > > > > That said, I agree that if this type of API makes it into Kafka, having a > > (new, Java-based) AdminClient method would definitely be a good idea. An > > alternative path might be to have a Consumer-based implementation since > > that seems like a very intuitive, natural way to use the protocol. I > think > > optimizing for the expected use case would be a good idea. > > > > -Ewen > > > > Are you saying that the Scala AdminClient is not a public API and we > discourage addition of any new feature to this class? > > I still prefer to add it to AdminClient (Java version in the future and > Scala version in the short team) because I feel it belongs to admin > operation instead of KafkaConsumer interface. For example, if in the future > we implement the "delete data before committed offset" strategy in an > external service, I feel it is a bit awkward if the service has to > instantiate a KafkaConsumer and call KafkaConsumer.purgeDataBefore(...) to > purge data. In other words, our expected use-case doesn't necessarily bind > this API with consumer. > > I am not strong on this issue. Let's see what other committers/developers > think about this. > > > > > > > > > > Thanks, > > > Dong > > > > > > On Tue, Jan 3, 2017 at 3:56 PM, Ewen Cheslack-Postava < > ewen@confluent.io > > > > > > wrote: > > > > > > > Dong, > > > > > > > > Looks like that's an internal link, > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-107% > > > > 3A+Add+purgeDataBefore%28%29+API+in+AdminClient > > > > is the right one. > > > > > > > > I have a question about one of the rejected alternatives: > > > > > > > > > Using committed offset instead of an extra API to trigger data > purge > > > > operation. > > > > > > > > The KIP says this would be more complicated to implement. Why is > that? > > I > > > > think brokers would have to consume the entire offsets topic, but the > > > data > > > > stored in memory doesn't seem to change and applying this when > updated > > > > offsets are seen seems basically the same. It might also be possible > to > > > > make it work even with multiple consumer groups if that was desired > > > > (although that'd require tracking more data in memory) as a > > > generalization > > > > without requiring coordination between the consumer groups. Given the > > > > motivation, I'm assuming this was considered unnecessary since this > > > > specifically targets intermediate stream processing topics. > > > > > > > > Another question is why expose this via AdminClient (which isn't > public > > > API > > > > afaik)? Why not, for example, expose it on the Consumer, which is > > > > presumably where you'd want access to it since the functionality > > depends > > > on > > > > the consumer actually having consumed the data? > > > > > > > > -Ewen > > > > > > > > On Tue, Jan 3, 2017 at 2:45 PM, Dong Lin > wrote: > > > > > > > > > Hi all, > > > > > > > > > > We created KIP-107 to propose addition of purgeDataBefore() API in > > > > > AdminClient. > > > > > > > > > > Please find the KIP wiki in the link https://iwww.corp.linkedin. > > > > > com/wiki/cf/display/ENGS/Kafka+purgeDataBefore%28%29+API+ > > > > design+proposal. > > > > > We > > > > > would love to hear your comments and suggestions. > > > > > > > > > > Thanks, > > > > > Dong > > > > > > > > > > > > > > > --001a113fdc50929bf7054546cd67--