kafka-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From radai <radai.rosenbl...@gmail.com>
Subject Re: [DISCUSS] KIP-107: Add purgeDataBefore() API in AdminClient
Date Wed, 04 Jan 2017 16:13:07 GMT
one more example of complicated config - mirror maker.

we definitely cant trust each and every topic owner to configure their
topics not to purge before they've been mirrored.
which would mean there's a per-topic config (set by the owner) and a
"global" config (where mirror makers are specified) and they need to be
"merged".
for those topics that _are_ mirrored.
which is a changing set of topics thats stored in an external system
outside of kafka.
if a topic is taken out of the mirror set the MM offset would be "frozen"
at that point and prevent clean-up for all eternity, unless its cleaned-up
itself.

...

complexity :-)

On Wed, Jan 4, 2017 at 8:04 AM, radai <radai.rosenblatt@gmail.com> wrote:

> in summary - i'm not opposed to the idea of a per-topic clean up config
> that tracks some set of consumer groups' offsets (which would probably work
> for 80% of use cases), but i definitely see a need to expose a simple API
> for the more advanced/obscure/custom use cases (the other 20%).
>
> On Wed, Jan 4, 2017 at 7:54 AM, radai <radai.rosenblatt@gmail.com> wrote:
>
>> a major motivation for this KIP is cost savings.
>>
>> lots of internal systems at LI use kafka as an intermediate pipe, and set
>> the topic retention period to a "safe enough" amount of time to be able to
>> recover from crashes/downtime and catch up to "now". this results in a few
>> days' worth of retention typically.
>>
>> however, under normal operating conditions the consumers are mostly
>> caught-up and so early clean-up enables a big cost savings in storage.
>>
>> as for my points:
>>
>> 1. when discussing implementation options for automatic clean-up we
>> realized that cleaning up by keeping track of offsets stored in kafka
>> requires some per-topic config - you need to specify which groups to track.
>> this becomes a problem because:
>>     1.1 - relatively complicated code, to be written in the broker.
>>     1.2 - configuration needs to be maintained up to date by topic
>> "owners" - of which we have thousands. failure to do so would decrease the
>> cost benefit.
>>     1.3 - some applications have a "reconsume" / "reinit" / "bootstrap"
>> workflow where they will reset their offsets to an earlier value than the
>> one stored. this means that a stored offset of X does not always mean you
>> can clean up to X-1. think of it as video encoding -some apps have "key
>> frames" they may seek back to which are before their current offset.
>>     1.4 - there are multiple possible strategies - you could clean up
>> aggressively, retain some "time distance" from latest, some "offset
>> distance", etc. this we think would have made it very hard to agree on a
>> single "correct" implementation that everyone would be happy with. it would
>> be better to include the raw functionality in the API and leave the
>> "brains" to an external monitoring system where people could custom-taylor
>> their logic
>>
>> 2. ad-hoc consumer groups: its common practice for devs to spin up
>> console consumers and connect to a topic as a debug aid. SREs may also do
>> this. there are also various other eco-system applications that may
>> consumer from topics (unknown to topic owners as those are infra monitoring
>> tools). obviously such consumer-groups' offsets should be ignored for
>> purposes of clean-up, but coming up with a bullet-proof way to do this is
>> non-trivial and again ties with implementation complexity and inflexibility
>> of a "one size fits all" solution in 1.4 above.
>>
>> 3. forceful clean-up: we have workflows that use kafka to move gigantic
>> blobs from offline hadoop processing flows into systems. the data being
>> "loaded" into such an online system can be several GBs in side and take a
>> long time to consume (they are sliced into many small msgs). sometimes the
>> sender wants to abort and start a new blob before the current load process
>> has completed - meaning the consumer's offsets are not yet caught up.
>>
>> 4. offsets outside of kafka: yes, you could force applications to store
>> their offsets twice, but thats inefficient. its better to expose a raw,
>> simple API and let such applications manage their own clean-up logic (this
>> again ties into 1.4 and no "one size fits all" solution)
>>
>> On Tue, Jan 3, 2017 at 11:49 PM, Dong Lin <lindong28@gmail.com> wrote:
>>
>>> On Tue, Jan 3, 2017 at 11:01 PM, Ewen Cheslack-Postava <
>>> ewen@confluent.io>
>>> wrote:
>>>
>>> > On Tue, Jan 3, 2017 at 6:14 PM, Dong Lin <lindong28@gmail.com> wrote:
>>> >
>>> > > Hey Ewen,
>>> > >
>>> > > Thanks for the review. As Radai explained, it would be complex in
>>> terms
>>> > of
>>> > > user configuration if we were to use committed offset to decide data
>>> > > deletion. We need a way to specify which groups need to consume data
>>> of
>>> > > this partition. The broker will also need to consume the entire
>>> offsets
>>> > > topic in that approach which has some overhead. I don't think it is
>>> that
>>> > > hard to implement. But it will likely take more time to discuss that
>>> > > approach due to the new config and the server side overhead.
>>> > >
>>> > > We choose to put this API in AdminClient because the API is more
>>> like an
>>> > > administrative operation (such as listGroups, deleteTopics) than a
>>> > consumer
>>> > > operation. It is not necessarily called by consumer only. For
>>> example, we
>>> > > can implement the "delete data before committed offset" approach by
>>> > running
>>> > > an external service which calls purgeDataBefore() API based on
>>> committed
>>> > > offset of consumer groups.
>>> > >
>>> > > I am not aware that AdminClient is not a public API. Suppose it is
>>> not
>>> > > public now, I assume we plan to make it public in the future as part
>>> of
>>> > > KIP-4. Are we not making it public because its interface is not
>>> stable?
>>> > If
>>> > > so, can we just tag this new API as not stable in the code?
>>> > >
>>> >
>>> >
>>> > The AdminClient planned for KIP-4 is a new Java-based implementation.
>>> It's
>>> > definitely confusing that both will be (could be?) named AdminClient,
>>> but
>>> > we've kept the existing Scala AdminClient out of the public API and
>>> have
>>> > not required KIPs for changes to it.
>>> >
>>> > That said, I agree that if this type of API makes it into Kafka,
>>> having a
>>> > (new, Java-based) AdminClient method would definitely be a good idea.
>>> An
>>> > alternative path might be to have a Consumer-based implementation since
>>> > that seems like a very intuitive, natural way to use the protocol. I
>>> think
>>> > optimizing for the expected use case would be a good idea.
>>> >
>>> > -Ewen
>>> >
>>> > Are you saying that the Scala AdminClient is not a public API and we
>>> discourage addition of any new feature to this class?
>>>
>>> I still prefer to add it to AdminClient (Java version in the future and
>>> Scala version in the short team) because I feel it belongs to admin
>>> operation instead of KafkaConsumer interface. For example, if in the
>>> future
>>> we implement the "delete data before committed offset" strategy in an
>>> external service, I feel it is a bit awkward if the service has to
>>> instantiate a KafkaConsumer and call KafkaConsumer.purgeDataBefore(...)
>>> to
>>> purge data. In other words, our expected use-case doesn't necessarily
>>> bind
>>> this API with consumer.
>>>
>>> I am not strong on this issue. Let's see what other committers/developers
>>> think about this.
>>>
>>>
>>> >
>>> > >
>>> > > Thanks,
>>> > > Dong
>>> > >
>>> > > On Tue, Jan 3, 2017 at 3:56 PM, Ewen Cheslack-Postava <
>>> ewen@confluent.io
>>> > >
>>> > > wrote:
>>> > >
>>> > > > Dong,
>>> > > >
>>> > > > Looks like that's an internal link,
>>> > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-107%
>>> > > > 3A+Add+purgeDataBefore%28%29+API+in+AdminClient
>>> > > > is the right one.
>>> > > >
>>> > > > I have a question about one of the rejected alternatives:
>>> > > >
>>> > > > > Using committed offset instead of an extra API to trigger
data
>>> purge
>>> > > > operation.
>>> > > >
>>> > > > The KIP says this would be more complicated to implement. Why
is
>>> that?
>>> > I
>>> > > > think brokers would have to consume the entire offsets topic,
but
>>> the
>>> > > data
>>> > > > stored in memory doesn't seem to change and applying this when
>>> updated
>>> > > > offsets are seen seems basically the same. It might also be
>>> possible to
>>> > > > make it work even with multiple consumer groups if that was desired
>>> > > > (although that'd require tracking more data in memory) as a
>>> > > generalization
>>> > > > without requiring coordination between the consumer groups. Given
>>> the
>>> > > > motivation, I'm assuming this was considered unnecessary since
this
>>> > > > specifically targets intermediate stream processing topics.
>>> > > >
>>> > > > Another question is why expose this via AdminClient (which isn't
>>> public
>>> > > API
>>> > > > afaik)? Why not, for example, expose it on the Consumer, which
is
>>> > > > presumably where you'd want access to it since the functionality
>>> > depends
>>> > > on
>>> > > > the consumer actually having consumed the data?
>>> > > >
>>> > > > -Ewen
>>> > > >
>>> > > > On Tue, Jan 3, 2017 at 2:45 PM, Dong Lin <lindong28@gmail.com>
>>> wrote:
>>> > > >
>>> > > > > Hi all,
>>> > > > >
>>> > > > > We created KIP-107 to propose addition of purgeDataBefore()
API
>>> in
>>> > > > > AdminClient.
>>> > > > >
>>> > > > > Please find the KIP wiki in the link https://iwww.corp.linkedin.
>>> > > > > com/wiki/cf/display/ENGS/Kafka+purgeDataBefore%28%29+API+
>>> > > > design+proposal.
>>> > > > > We
>>> > > > > would love to hear your comments and suggestions.
>>> > > > >
>>> > > > > Thanks,
>>> > > > > Dong
>>> > > > >
>>> > > >
>>> > >
>>> >
>>>
>>
>>
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message