kafka-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Dong Lin <lindon...@gmail.com>
Subject Re: [DISCUSS] KIP-107: Add purgeDataBefore() API in AdminClient
Date Tue, 10 Jan 2017 22:56:06 GMT
Bump up. I am going to initiate the vote If there is no further concern
with the KIP.

On Fri, Jan 6, 2017 at 11:23 PM, Dong Lin <lindong28@gmail.com> wrote:

> Hey Mayuresh,
>
> Thanks for the comment. If the message's offset is below low_watermark,
> then it should have been deleted by log retention policy. Thus it is OK not
> to expose this message to consumer. Does this answer your question?
>
> Thanks,
> Dong
>
> On Fri, Jan 6, 2017 at 4:21 PM, Mayuresh Gharat <
> gharatmayuresh15@gmail.com> wrote:
>
>> Hi Dong,
>>
>> Thanks for the KIP.
>>
>> I had a question (which might have been answered before).
>>
>> 1) The KIP says that the low_water_mark will be updated periodically by
>> the
>> broker like high_water_mark.
>> Essentially we want to use low_water_mark for cases where an entire
>> segment
>> cannot be deleted because may be the segment_start_offset < PurgeOffset <
>> segment_end_offset, in which case we will set the low_water_mark to
>> PurgeOffset+1.
>>
>> 2) The KIP also says that messages below low_water_mark will not be
>> exposed
>> for consumers, which does make sense since we want say that data below
>> low_water_mark is purged.
>>
>> Looking at above conditions, does it make sense not to update the
>> low_water_mark periodically but only on PurgeRequest?
>> The reason being, if we update it periodically then as per 2) we will not
>> be allowing consumers to re-consume data that is not purged but is below
>> low_water_mark.
>>
>> Thanks,
>>
>> Mayuresh
>>
>>
>> On Fri, Jan 6, 2017 at 11:18 AM, Dong Lin <lindong28@gmail.com> wrote:
>>
>> > Hey Jun,
>> >
>> > Thanks for reviewing the KIP!
>> >
>> > 1. The low_watermark will be checkpointed in a new file named
>> >  "replication-low-watermark-checkpoint". It will have the same format
>> as
>> > the existing replication-offset-checkpoint file. This allows us the keep
>> > the existing format of checkpoint files which maps TopicPartition to
>> Long.
>> > I just updated the "Public Interface" section in the KIP wiki to explain
>> > this file.
>> >
>> > 2. I think using low_watermark from leader to trigger log retention in
>> the
>> > follower will work correctly in the sense that all messages with offset
>> <
>> > low_watermark can be deleted. But I am not sure that the efficiency is
>> the
>> > same, i.e. offset of messages which should be deleted (i.e. due to time
>> or
>> > size-based log retention policy) will be smaller than low_watermark from
>> > the leader.
>> >
>> > For example, say both the follower and the leader have messages with
>> > offsets in range [0, 2000]. If the follower does log rolling slightly
>> later
>> > than leader, the segments on follower would be [0, 1001], [1002, 2000]
>> and
>> > segments on leader would be [0, 1000], [1001, 2000]. After leader
>> deletes
>> > the first segment, the low_watermark would be 1001. Thus the first
>> segment
>> > would stay on follower's disk unnecessarily which may double disk usage
>> at
>> > worst.
>> >
>> > Since this approach doesn't save us much, I am inclined to not include
>> this
>> > change to keep the KIP simple.
>> >
>> > Dong
>> >
>> >
>> >
>> > On Fri, Jan 6, 2017 at 10:05 AM, Jun Rao <jun@confluent.io> wrote:
>> >
>> > > Hi, Dong,
>> > >
>> > > Thanks for the proposal. Looks good overall. A couple of comments.
>> > >
>> > > 1. Where is the low_watermark checkpointed? Is that
>> > > in replication-offset-checkpoint? If so, do we need to bump up the
>> > version?
>> > > Could you also describe the format change?
>> > >
>> > > 2. For topics with "delete" retention, currently we let each replica
>> > delete
>> > > old segments independently. With low_watermark, we could just let
>> leaders
>> > > delete old segments through the deletion policy and the followers will
>> > > simply delete old segments based on low_watermark. Not sure if this
>> saves
>> > > much, but is a potential option that may be worth thinking about.
>> > >
>> > > Jun
>> > >
>> > >
>> > >
>> > > On Wed, Jan 4, 2017 at 8:13 AM, radai <radai.rosenblatt@gmail.com>
>> > wrote:
>> > >
>> > > > one more example of complicated config - mirror maker.
>> > > >
>> > > > we definitely cant trust each and every topic owner to configure
>> their
>> > > > topics not to purge before they've been mirrored.
>> > > > which would mean there's a per-topic config (set by the owner) and
a
>> > > > "global" config (where mirror makers are specified) and they need
>> to be
>> > > > "merged".
>> > > > for those topics that _are_ mirrored.
>> > > > which is a changing set of topics thats stored in an external system
>> > > > outside of kafka.
>> > > > if a topic is taken out of the mirror set the MM offset would be
>> > "frozen"
>> > > > at that point and prevent clean-up for all eternity, unless its
>> > > cleaned-up
>> > > > itself.
>> > > >
>> > > > ...
>> > > >
>> > > > complexity :-)
>> > > >
>> > > > On Wed, Jan 4, 2017 at 8:04 AM, radai <radai.rosenblatt@gmail.com>
>> > > wrote:
>> > > >
>> > > > > in summary - i'm not opposed to the idea of a per-topic clean
up
>> > config
>> > > > > that tracks some set of consumer groups' offsets (which would
>> > probably
>> > > > work
>> > > > > for 80% of use cases), but i definitely see a need to expose
a
>> simple
>> > > API
>> > > > > for the more advanced/obscure/custom use cases (the other 20%).
>> > > > >
>> > > > > On Wed, Jan 4, 2017 at 7:54 AM, radai <radai.rosenblatt@gmail.com
>> >
>> > > > wrote:
>> > > > >
>> > > > >> a major motivation for this KIP is cost savings.
>> > > > >>
>> > > > >> lots of internal systems at LI use kafka as an intermediate
pipe,
>> > and
>> > > > set
>> > > > >> the topic retention period to a "safe enough" amount of time
to
>> be
>> > > able
>> > > > to
>> > > > >> recover from crashes/downtime and catch up to "now". this
results
>> > in a
>> > > > few
>> > > > >> days' worth of retention typically.
>> > > > >>
>> > > > >> however, under normal operating conditions the consumers
are
>> mostly
>> > > > >> caught-up and so early clean-up enables a big cost savings
in
>> > storage.
>> > > > >>
>> > > > >> as for my points:
>> > > > >>
>> > > > >> 1. when discussing implementation options for automatic clean-up
>> we
>> > > > >> realized that cleaning up by keeping track of offsets stored
in
>> > kafka
>> > > > >> requires some per-topic config - you need to specify which
>> groups to
>> > > > track.
>> > > > >> this becomes a problem because:
>> > > > >>     1.1 - relatively complicated code, to be written in the
>> broker.
>> > > > >>     1.2 - configuration needs to be maintained up to date
by
>> topic
>> > > > >> "owners" - of which we have thousands. failure to do so would
>> > decrease
>> > > > the
>> > > > >> cost benefit.
>> > > > >>     1.3 - some applications have a "reconsume" / "reinit"
/
>> > > "bootstrap"
>> > > > >> workflow where they will reset their offsets to an earlier
value
>> > than
>> > > > the
>> > > > >> one stored. this means that a stored offset of X does not
always
>> > mean
>> > > > you
>> > > > >> can clean up to X-1. think of it as video encoding -some
apps
>> have
>> > > "key
>> > > > >> frames" they may seek back to which are before their current
>> offset.
>> > > > >>     1.4 - there are multiple possible strategies - you could
>> clean
>> > up
>> > > > >> aggressively, retain some "time distance" from latest, some
>> "offset
>> > > > >> distance", etc. this we think would have made it very hard
to
>> agree
>> > > on a
>> > > > >> single "correct" implementation that everyone would be happy
>> with.
>> > it
>> > > > would
>> > > > >> be better to include the raw functionality in the API and
leave
>> the
>> > > > >> "brains" to an external monitoring system where people could
>> > > > custom-taylor
>> > > > >> their logic
>> > > > >>
>> > > > >> 2. ad-hoc consumer groups: its common practice for devs to
spin
>> up
>> > > > >> console consumers and connect to a topic as a debug aid.
SREs may
>> > also
>> > > > do
>> > > > >> this. there are also various other eco-system applications
that
>> may
>> > > > >> consumer from topics (unknown to topic owners as those are
infra
>> > > > monitoring
>> > > > >> tools). obviously such consumer-groups' offsets should be
ignored
>> > for
>> > > > >> purposes of clean-up, but coming up with a bullet-proof way
to do
>> > this
>> > > > is
>> > > > >> non-trivial and again ties with implementation complexity
and
>> > > > inflexibility
>> > > > >> of a "one size fits all" solution in 1.4 above.
>> > > > >>
>> > > > >> 3. forceful clean-up: we have workflows that use kafka to
move
>> > > gigantic
>> > > > >> blobs from offline hadoop processing flows into systems.
the data
>> > > being
>> > > > >> "loaded" into such an online system can be several GBs in
side
>> and
>> > > take
>> > > > a
>> > > > >> long time to consume (they are sliced into many small msgs).
>> > sometimes
>> > > > the
>> > > > >> sender wants to abort and start a new blob before the current
>> load
>> > > > process
>> > > > >> has completed - meaning the consumer's offsets are not yet
caught
>> > up.
>> > > > >>
>> > > > >> 4. offsets outside of kafka: yes, you could force applications
to
>> > > store
>> > > > >> their offsets twice, but thats inefficient. its better to
expose
>> a
>> > > raw,
>> > > > >> simple API and let such applications manage their own clean-up
>> logic
>> > > > (this
>> > > > >> again ties into 1.4 and no "one size fits all" solution)
>> > > > >>
>> > > > >> On Tue, Jan 3, 2017 at 11:49 PM, Dong Lin <lindong28@gmail.com>
>> > > wrote:
>> > > > >>
>> > > > >>> On Tue, Jan 3, 2017 at 11:01 PM, Ewen Cheslack-Postava
<
>> > > > >>> ewen@confluent.io>
>> > > > >>> wrote:
>> > > > >>>
>> > > > >>> > On Tue, Jan 3, 2017 at 6:14 PM, Dong Lin <lindong28@gmail.com
>> >
>> > > > wrote:
>> > > > >>> >
>> > > > >>> > > Hey Ewen,
>> > > > >>> > >
>> > > > >>> > > Thanks for the review. As Radai explained,
it would be
>> complex
>> > in
>> > > > >>> terms
>> > > > >>> > of
>> > > > >>> > > user configuration if we were to use committed
offset to
>> decide
>> > > > data
>> > > > >>> > > deletion. We need a way to specify which groups
need to
>> consume
>> > > > data
>> > > > >>> of
>> > > > >>> > > this partition. The broker will also need to
consume the
>> entire
>> > > > >>> offsets
>> > > > >>> > > topic in that approach which has some overhead.
I don't
>> think
>> > it
>> > > is
>> > > > >>> that
>> > > > >>> > > hard to implement. But it will likely take
more time to
>> discuss
>> > > > that
>> > > > >>> > > approach due to the new config and the server
side overhead.
>> > > > >>> > >
>> > > > >>> > > We choose to put this API in AdminClient because
the API is
>> > more
>> > > > >>> like an
>> > > > >>> > > administrative operation (such as listGroups,
deleteTopics)
>> > than
>> > > a
>> > > > >>> > consumer
>> > > > >>> > > operation. It is not necessarily called by
consumer only.
>> For
>> > > > >>> example, we
>> > > > >>> > > can implement the "delete data before committed
offset"
>> > approach
>> > > by
>> > > > >>> > running
>> > > > >>> > > an external service which calls purgeDataBefore()
API based
>> on
>> > > > >>> committed
>> > > > >>> > > offset of consumer groups.
>> > > > >>> > >
>> > > > >>> > > I am not aware that AdminClient is not a public
API.
>> Suppose it
>> > > is
>> > > > >>> not
>> > > > >>> > > public now, I assume we plan to make it public
in the
>> future as
>> > > > part
>> > > > >>> of
>> > > > >>> > > KIP-4. Are we not making it public because
its interface is
>> not
>> > > > >>> stable?
>> > > > >>> > If
>> > > > >>> > > so, can we just tag this new API as not stable
in the code?
>> > > > >>> > >
>> > > > >>> >
>> > > > >>> >
>> > > > >>> > The AdminClient planned for KIP-4 is a new Java-based
>> > > implementation.
>> > > > >>> It's
>> > > > >>> > definitely confusing that both will be (could be?)
named
>> > > AdminClient,
>> > > > >>> but
>> > > > >>> > we've kept the existing Scala AdminClient out of
the public
>> API
>> > and
>> > > > >>> have
>> > > > >>> > not required KIPs for changes to it.
>> > > > >>> >
>> > > > >>> > That said, I agree that if this type of API makes
it into
>> Kafka,
>> > > > >>> having a
>> > > > >>> > (new, Java-based) AdminClient method would definitely
be a
>> good
>> > > idea.
>> > > > >>> An
>> > > > >>> > alternative path might be to have a Consumer-based
>> implementation
>> > > > since
>> > > > >>> > that seems like a very intuitive, natural way to
use the
>> > protocol.
>> > > I
>> > > > >>> think
>> > > > >>> > optimizing for the expected use case would be a
good idea.
>> > > > >>> >
>> > > > >>> > -Ewen
>> > > > >>> >
>> > > > >>> > Are you saying that the Scala AdminClient is not
a public API
>> and
>> > > we
>> > > > >>> discourage addition of any new feature to this class?
>> > > > >>>
>> > > > >>> I still prefer to add it to AdminClient (Java version
in the
>> future
>> > > and
>> > > > >>> Scala version in the short team) because I feel it belongs
to
>> admin
>> > > > >>> operation instead of KafkaConsumer interface. For example,
if in
>> > the
>> > > > >>> future
>> > > > >>> we implement the "delete data before committed offset"
strategy
>> in
>> > an
>> > > > >>> external service, I feel it is a bit awkward if the service
has
>> to
>> > > > >>> instantiate a KafkaConsumer and call
>> KafkaConsumer.purgeDataBefore(
>> > > > ...)
>> > > > >>> to
>> > > > >>> purge data. In other words, our expected use-case doesn't
>> > necessarily
>> > > > >>> bind
>> > > > >>> this API with consumer.
>> > > > >>>
>> > > > >>> I am not strong on this issue. Let's see what other
>> > > > committers/developers
>> > > > >>> think about this.
>> > > > >>>
>> > > > >>>
>> > > > >>> >
>> > > > >>> > >
>> > > > >>> > > Thanks,
>> > > > >>> > > Dong
>> > > > >>> > >
>> > > > >>> > > On Tue, Jan 3, 2017 at 3:56 PM, Ewen Cheslack-Postava
<
>> > > > >>> ewen@confluent.io
>> > > > >>> > >
>> > > > >>> > > wrote:
>> > > > >>> > >
>> > > > >>> > > > Dong,
>> > > > >>> > > >
>> > > > >>> > > > Looks like that's an internal link,
>> > > > >>> > > > https://cwiki.apache.org/confl
>> uence/display/KAFKA/KIP-107%
>> > > > >>> > > > 3A+Add+purgeDataBefore%28%29+API+in+AdminClient
>> > > > >>> > > > is the right one.
>> > > > >>> > > >
>> > > > >>> > > > I have a question about one of the rejected
alternatives:
>> > > > >>> > > >
>> > > > >>> > > > > Using committed offset instead of
an extra API to
>> trigger
>> > > data
>> > > > >>> purge
>> > > > >>> > > > operation.
>> > > > >>> > > >
>> > > > >>> > > > The KIP says this would be more complicated
to implement.
>> Why
>> > > is
>> > > > >>> that?
>> > > > >>> > I
>> > > > >>> > > > think brokers would have to consume the
entire offsets
>> topic,
>> > > but
>> > > > >>> the
>> > > > >>> > > data
>> > > > >>> > > > stored in memory doesn't seem to change
and applying this
>> > when
>> > > > >>> updated
>> > > > >>> > > > offsets are seen seems basically the same.
It might also
>> be
>> > > > >>> possible to
>> > > > >>> > > > make it work even with multiple consumer
groups if that
>> was
>> > > > desired
>> > > > >>> > > > (although that'd require tracking more
data in memory) as
>> a
>> > > > >>> > > generalization
>> > > > >>> > > > without requiring coordination between
the consumer
>> groups.
>> > > Given
>> > > > >>> the
>> > > > >>> > > > motivation, I'm assuming this was considered
unnecessary
>> > since
>> > > > this
>> > > > >>> > > > specifically targets intermediate stream
processing
>> topics.
>> > > > >>> > > >
>> > > > >>> > > > Another question is why expose this via
AdminClient (which
>> > > isn't
>> > > > >>> public
>> > > > >>> > > API
>> > > > >>> > > > afaik)? Why not, for example, expose it
on the Consumer,
>> > which
>> > > is
>> > > > >>> > > > presumably where you'd want access to
it since the
>> > > functionality
>> > > > >>> > depends
>> > > > >>> > > on
>> > > > >>> > > > the consumer actually having consumed
the data?
>> > > > >>> > > >
>> > > > >>> > > > -Ewen
>> > > > >>> > > >
>> > > > >>> > > > On Tue, Jan 3, 2017 at 2:45 PM, Dong Lin
<
>> > lindong28@gmail.com>
>> > > > >>> wrote:
>> > > > >>> > > >
>> > > > >>> > > > > Hi all,
>> > > > >>> > > > >
>> > > > >>> > > > > We created KIP-107 to propose addition
of
>> purgeDataBefore()
>> > > API
>> > > > >>> in
>> > > > >>> > > > > AdminClient.
>> > > > >>> > > > >
>> > > > >>> > > > > Please find the KIP wiki in the link
>> > > > https://iwww.corp.linkedin.
>> > > > >>> > > > > com/wiki/cf/display/ENGS/Kafka
>> +purgeDataBefore%28%29+API+
>> > > > >>> > > > design+proposal.
>> > > > >>> > > > > We
>> > > > >>> > > > > would love to hear your comments
and suggestions.
>> > > > >>> > > > >
>> > > > >>> > > > > Thanks,
>> > > > >>> > > > > Dong
>> > > > >>> > > > >
>> > > > >>> > > >
>> > > > >>> > >
>> > > > >>> >
>> > > > >>>
>> > > > >>
>> > > > >>
>> > > > >
>> > > >
>> > >
>> >
>>
>>
>>
>> --
>> -Regards,
>> Mayuresh R. Gharat
>> (862) 250-7125
>>
>
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message