kafka-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Dong Lin <lindon...@gmail.com>
Subject Re: [DISCUSS] KIP-107: Add purgeDataBefore() API in AdminClient
Date Wed, 11 Jan 2017 19:51:33 GMT
Hi Mayuresh,

low_watermark will be updated when log retention fires on the broker. It
may also be updated on the follower when follower receives FetchResponse
from leader; and it may be updated on the leader when leader receives
PurgeRequest from admin client.

Thanks,
Dong

On Wed, Jan 11, 2017 at 7:37 AM, Mayuresh Gharat <gharatmayuresh15@gmail.com
> wrote:

> Hi Dong,
>
> As per  "If the message's offset is below low_watermark,
> then it should have been deleted by log retention policy."
> ---> I am not sure if  I understand this correctly. Do you mean to say that
> the low_watermark will be updated only when the log retention fires on the
> broker?
>
> Thanks,
>
> Mayuresh
>
> On Tue, Jan 10, 2017 at 2:56 PM, Dong Lin <lindong28@gmail.com> wrote:
>
> > Bump up. I am going to initiate the vote If there is no further concern
> > with the KIP.
> >
> > On Fri, Jan 6, 2017 at 11:23 PM, Dong Lin <lindong28@gmail.com> wrote:
> >
> > > Hey Mayuresh,
> > >
> > > Thanks for the comment. If the message's offset is below low_watermark,
> > > then it should have been deleted by log retention policy. Thus it is OK
> > not
> > > to expose this message to consumer. Does this answer your question?
> > >
> > > Thanks,
> > > Dong
> > >
> > > On Fri, Jan 6, 2017 at 4:21 PM, Mayuresh Gharat <
> > > gharatmayuresh15@gmail.com> wrote:
> > >
> > >> Hi Dong,
> > >>
> > >> Thanks for the KIP.
> > >>
> > >> I had a question (which might have been answered before).
> > >>
> > >> 1) The KIP says that the low_water_mark will be updated periodically
> by
> > >> the
> > >> broker like high_water_mark.
> > >> Essentially we want to use low_water_mark for cases where an entire
> > >> segment
> > >> cannot be deleted because may be the segment_start_offset <
> PurgeOffset
> > <
> > >> segment_end_offset, in which case we will set the low_water_mark to
> > >> PurgeOffset+1.
> > >>
> > >> 2) The KIP also says that messages below low_water_mark will not be
> > >> exposed
> > >> for consumers, which does make sense since we want say that data below
> > >> low_water_mark is purged.
> > >>
> > >> Looking at above conditions, does it make sense not to update the
> > >> low_water_mark periodically but only on PurgeRequest?
> > >> The reason being, if we update it periodically then as per 2) we will
> > not
> > >> be allowing consumers to re-consume data that is not purged but is
> below
> > >> low_water_mark.
> > >>
> > >> Thanks,
> > >>
> > >> Mayuresh
> > >>
> > >>
> > >> On Fri, Jan 6, 2017 at 11:18 AM, Dong Lin <lindong28@gmail.com>
> wrote:
> > >>
> > >> > Hey Jun,
> > >> >
> > >> > Thanks for reviewing the KIP!
> > >> >
> > >> > 1. The low_watermark will be checkpointed in a new file named
> > >> >  "replication-low-watermark-checkpoint". It will have the same
> format
> > >> as
> > >> > the existing replication-offset-checkpoint file. This allows us the
> > keep
> > >> > the existing format of checkpoint files which maps TopicPartition
to
> > >> Long.
> > >> > I just updated the "Public Interface" section in the KIP wiki to
> > explain
> > >> > this file.
> > >> >
> > >> > 2. I think using low_watermark from leader to trigger log retention
> in
> > >> the
> > >> > follower will work correctly in the sense that all messages with
> > offset
> > >> <
> > >> > low_watermark can be deleted. But I am not sure that the efficiency
> is
> > >> the
> > >> > same, i.e. offset of messages which should be deleted (i.e. due to
> > time
> > >> or
> > >> > size-based log retention policy) will be smaller than low_watermark
> > from
> > >> > the leader.
> > >> >
> > >> > For example, say both the follower and the leader have messages with
> > >> > offsets in range [0, 2000]. If the follower does log rolling
> slightly
> > >> later
> > >> > than leader, the segments on follower would be [0, 1001], [1002,
> 2000]
> > >> and
> > >> > segments on leader would be [0, 1000], [1001, 2000]. After leader
> > >> deletes
> > >> > the first segment, the low_watermark would be 1001. Thus the first
> > >> segment
> > >> > would stay on follower's disk unnecessarily which may double disk
> > usage
> > >> at
> > >> > worst.
> > >> >
> > >> > Since this approach doesn't save us much, I am inclined to not
> include
> > >> this
> > >> > change to keep the KIP simple.
> > >> >
> > >> > Dong
> > >> >
> > >> >
> > >> >
> > >> > On Fri, Jan 6, 2017 at 10:05 AM, Jun Rao <jun@confluent.io>
wrote:
> > >> >
> > >> > > Hi, Dong,
> > >> > >
> > >> > > Thanks for the proposal. Looks good overall. A couple of comments.
> > >> > >
> > >> > > 1. Where is the low_watermark checkpointed? Is that
> > >> > > in replication-offset-checkpoint? If so, do we need to bump up
the
> > >> > version?
> > >> > > Could you also describe the format change?
> > >> > >
> > >> > > 2. For topics with "delete" retention, currently we let each
> replica
> > >> > delete
> > >> > > old segments independently. With low_watermark, we could just
let
> > >> leaders
> > >> > > delete old segments through the deletion policy and the followers
> > will
> > >> > > simply delete old segments based on low_watermark. Not sure if
> this
> > >> saves
> > >> > > much, but is a potential option that may be worth thinking about.
> > >> > >
> > >> > > Jun
> > >> > >
> > >> > >
> > >> > >
> > >> > > On Wed, Jan 4, 2017 at 8:13 AM, radai <radai.rosenblatt@gmail.com
> >
> > >> > wrote:
> > >> > >
> > >> > > > one more example of complicated config - mirror maker.
> > >> > > >
> > >> > > > we definitely cant trust each and every topic owner to configure
> > >> their
> > >> > > > topics not to purge before they've been mirrored.
> > >> > > > which would mean there's a per-topic config (set by the
owner)
> > and a
> > >> > > > "global" config (where mirror makers are specified) and
they
> need
> > >> to be
> > >> > > > "merged".
> > >> > > > for those topics that _are_ mirrored.
> > >> > > > which is a changing set of topics thats stored in an external
> > system
> > >> > > > outside of kafka.
> > >> > > > if a topic is taken out of the mirror set the MM offset
would be
> > >> > "frozen"
> > >> > > > at that point and prevent clean-up for all eternity, unless
its
> > >> > > cleaned-up
> > >> > > > itself.
> > >> > > >
> > >> > > > ...
> > >> > > >
> > >> > > > complexity :-)
> > >> > > >
> > >> > > > On Wed, Jan 4, 2017 at 8:04 AM, radai <
> radai.rosenblatt@gmail.com
> > >
> > >> > > wrote:
> > >> > > >
> > >> > > > > in summary - i'm not opposed to the idea of a per-topic
clean
> up
> > >> > config
> > >> > > > > that tracks some set of consumer groups' offsets (which
would
> > >> > probably
> > >> > > > work
> > >> > > > > for 80% of use cases), but i definitely see a need
to expose a
> > >> simple
> > >> > > API
> > >> > > > > for the more advanced/obscure/custom use cases (the
other
> 20%).
> > >> > > > >
> > >> > > > > On Wed, Jan 4, 2017 at 7:54 AM, radai <
> > radai.rosenblatt@gmail.com
> > >> >
> > >> > > > wrote:
> > >> > > > >
> > >> > > > >> a major motivation for this KIP is cost savings.
> > >> > > > >>
> > >> > > > >> lots of internal systems at LI use kafka as an
intermediate
> > pipe,
> > >> > and
> > >> > > > set
> > >> > > > >> the topic retention period to a "safe enough" amount
of time
> to
> > >> be
> > >> > > able
> > >> > > > to
> > >> > > > >> recover from crashes/downtime and catch up to "now".
this
> > results
> > >> > in a
> > >> > > > few
> > >> > > > >> days' worth of retention typically.
> > >> > > > >>
> > >> > > > >> however, under normal operating conditions the
consumers are
> > >> mostly
> > >> > > > >> caught-up and so early clean-up enables a big cost
savings in
> > >> > storage.
> > >> > > > >>
> > >> > > > >> as for my points:
> > >> > > > >>
> > >> > > > >> 1. when discussing implementation options for automatic
> > clean-up
> > >> we
> > >> > > > >> realized that cleaning up by keeping track of offsets
stored
> in
> > >> > kafka
> > >> > > > >> requires some per-topic config - you need to specify
which
> > >> groups to
> > >> > > > track.
> > >> > > > >> this becomes a problem because:
> > >> > > > >>     1.1 - relatively complicated code, to be written
in the
> > >> broker.
> > >> > > > >>     1.2 - configuration needs to be maintained
up to date by
> > >> topic
> > >> > > > >> "owners" - of which we have thousands. failure
to do so would
> > >> > decrease
> > >> > > > the
> > >> > > > >> cost benefit.
> > >> > > > >>     1.3 - some applications have a "reconsume"
/ "reinit" /
> > >> > > "bootstrap"
> > >> > > > >> workflow where they will reset their offsets to
an earlier
> > value
> > >> > than
> > >> > > > the
> > >> > > > >> one stored. this means that a stored offset of
X does not
> > always
> > >> > mean
> > >> > > > you
> > >> > > > >> can clean up to X-1. think of it as video encoding
-some apps
> > >> have
> > >> > > "key
> > >> > > > >> frames" they may seek back to which are before
their current
> > >> offset.
> > >> > > > >>     1.4 - there are multiple possible strategies
- you could
> > >> clean
> > >> > up
> > >> > > > >> aggressively, retain some "time distance" from
latest, some
> > >> "offset
> > >> > > > >> distance", etc. this we think would have made it
very hard to
> > >> agree
> > >> > > on a
> > >> > > > >> single "correct" implementation that everyone would
be happy
> > >> with.
> > >> > it
> > >> > > > would
> > >> > > > >> be better to include the raw functionality in the
API and
> leave
> > >> the
> > >> > > > >> "brains" to an external monitoring system where
people could
> > >> > > > custom-taylor
> > >> > > > >> their logic
> > >> > > > >>
> > >> > > > >> 2. ad-hoc consumer groups: its common practice
for devs to
> spin
> > >> up
> > >> > > > >> console consumers and connect to a topic as a debug
aid. SREs
> > may
> > >> > also
> > >> > > > do
> > >> > > > >> this. there are also various other eco-system applications
> that
> > >> may
> > >> > > > >> consumer from topics (unknown to topic owners as
those are
> > infra
> > >> > > > monitoring
> > >> > > > >> tools). obviously such consumer-groups' offsets
should be
> > ignored
> > >> > for
> > >> > > > >> purposes of clean-up, but coming up with a bullet-proof
way
> to
> > do
> > >> > this
> > >> > > > is
> > >> > > > >> non-trivial and again ties with implementation
complexity and
> > >> > > > inflexibility
> > >> > > > >> of a "one size fits all" solution in 1.4 above.
> > >> > > > >>
> > >> > > > >> 3. forceful clean-up: we have workflows that use
kafka to
> move
> > >> > > gigantic
> > >> > > > >> blobs from offline hadoop processing flows into
systems. the
> > data
> > >> > > being
> > >> > > > >> "loaded" into such an online system can be several
GBs in
> side
> > >> and
> > >> > > take
> > >> > > > a
> > >> > > > >> long time to consume (they are sliced into many
small msgs).
> > >> > sometimes
> > >> > > > the
> > >> > > > >> sender wants to abort and start a new blob before
the current
> > >> load
> > >> > > > process
> > >> > > > >> has completed - meaning the consumer's offsets
are not yet
> > caught
> > >> > up.
> > >> > > > >>
> > >> > > > >> 4. offsets outside of kafka: yes, you could force
> applications
> > to
> > >> > > store
> > >> > > > >> their offsets twice, but thats inefficient. its
better to
> > expose
> > >> a
> > >> > > raw,
> > >> > > > >> simple API and let such applications manage their
own
> clean-up
> > >> logic
> > >> > > > (this
> > >> > > > >> again ties into 1.4 and no "one size fits all"
solution)
> > >> > > > >>
> > >> > > > >> On Tue, Jan 3, 2017 at 11:49 PM, Dong Lin <
> lindong28@gmail.com
> > >
> > >> > > wrote:
> > >> > > > >>
> > >> > > > >>> On Tue, Jan 3, 2017 at 11:01 PM, Ewen Cheslack-Postava
<
> > >> > > > >>> ewen@confluent.io>
> > >> > > > >>> wrote:
> > >> > > > >>>
> > >> > > > >>> > On Tue, Jan 3, 2017 at 6:14 PM, Dong Lin
<
> > lindong28@gmail.com
> > >> >
> > >> > > > wrote:
> > >> > > > >>> >
> > >> > > > >>> > > Hey Ewen,
> > >> > > > >>> > >
> > >> > > > >>> > > Thanks for the review. As Radai explained,
it would be
> > >> complex
> > >> > in
> > >> > > > >>> terms
> > >> > > > >>> > of
> > >> > > > >>> > > user configuration if we were to
use committed offset to
> > >> decide
> > >> > > > data
> > >> > > > >>> > > deletion. We need a way to specify
which groups need to
> > >> consume
> > >> > > > data
> > >> > > > >>> of
> > >> > > > >>> > > this partition. The broker will also
need to consume the
> > >> entire
> > >> > > > >>> offsets
> > >> > > > >>> > > topic in that approach which has
some overhead. I don't
> > >> think
> > >> > it
> > >> > > is
> > >> > > > >>> that
> > >> > > > >>> > > hard to implement. But it will likely
take more time to
> > >> discuss
> > >> > > > that
> > >> > > > >>> > > approach due to the new config and
the server side
> > overhead.
> > >> > > > >>> > >
> > >> > > > >>> > > We choose to put this API in AdminClient
because the API
> > is
> > >> > more
> > >> > > > >>> like an
> > >> > > > >>> > > administrative operation (such as
listGroups,
> > deleteTopics)
> > >> > than
> > >> > > a
> > >> > > > >>> > consumer
> > >> > > > >>> > > operation. It is not necessarily
called by consumer
> only.
> > >> For
> > >> > > > >>> example, we
> > >> > > > >>> > > can implement the "delete data before
committed offset"
> > >> > approach
> > >> > > by
> > >> > > > >>> > running
> > >> > > > >>> > > an external service which calls purgeDataBefore()
API
> > based
> > >> on
> > >> > > > >>> committed
> > >> > > > >>> > > offset of consumer groups.
> > >> > > > >>> > >
> > >> > > > >>> > > I am not aware that AdminClient is
not a public API.
> > >> Suppose it
> > >> > > is
> > >> > > > >>> not
> > >> > > > >>> > > public now, I assume we plan to make
it public in the
> > >> future as
> > >> > > > part
> > >> > > > >>> of
> > >> > > > >>> > > KIP-4. Are we not making it public
because its interface
> > is
> > >> not
> > >> > > > >>> stable?
> > >> > > > >>> > If
> > >> > > > >>> > > so, can we just tag this new API
as not stable in the
> > code?
> > >> > > > >>> > >
> > >> > > > >>> >
> > >> > > > >>> >
> > >> > > > >>> > The AdminClient planned for KIP-4 is a
new Java-based
> > >> > > implementation.
> > >> > > > >>> It's
> > >> > > > >>> > definitely confusing that both will be
(could be?) named
> > >> > > AdminClient,
> > >> > > > >>> but
> > >> > > > >>> > we've kept the existing Scala AdminClient
out of the
> public
> > >> API
> > >> > and
> > >> > > > >>> have
> > >> > > > >>> > not required KIPs for changes to it.
> > >> > > > >>> >
> > >> > > > >>> > That said, I agree that if this type of
API makes it into
> > >> Kafka,
> > >> > > > >>> having a
> > >> > > > >>> > (new, Java-based) AdminClient method would
definitely be a
> > >> good
> > >> > > idea.
> > >> > > > >>> An
> > >> > > > >>> > alternative path might be to have a Consumer-based
> > >> implementation
> > >> > > > since
> > >> > > > >>> > that seems like a very intuitive, natural
way to use the
> > >> > protocol.
> > >> > > I
> > >> > > > >>> think
> > >> > > > >>> > optimizing for the expected use case would
be a good idea.
> > >> > > > >>> >
> > >> > > > >>> > -Ewen
> > >> > > > >>> >
> > >> > > > >>> > Are you saying that the Scala AdminClient
is not a public
> > API
> > >> and
> > >> > > we
> > >> > > > >>> discourage addition of any new feature to this
class?
> > >> > > > >>>
> > >> > > > >>> I still prefer to add it to AdminClient (Java
version in the
> > >> future
> > >> > > and
> > >> > > > >>> Scala version in the short team) because I
feel it belongs
> to
> > >> admin
> > >> > > > >>> operation instead of KafkaConsumer interface.
For example,
> if
> > in
> > >> > the
> > >> > > > >>> future
> > >> > > > >>> we implement the "delete data before committed
offset"
> > strategy
> > >> in
> > >> > an
> > >> > > > >>> external service, I feel it is a bit awkward
if the service
> > has
> > >> to
> > >> > > > >>> instantiate a KafkaConsumer and call
> > >> KafkaConsumer.purgeDataBefore(
> > >> > > > ...)
> > >> > > > >>> to
> > >> > > > >>> purge data. In other words, our expected use-case
doesn't
> > >> > necessarily
> > >> > > > >>> bind
> > >> > > > >>> this API with consumer.
> > >> > > > >>>
> > >> > > > >>> I am not strong on this issue. Let's see what
other
> > >> > > > committers/developers
> > >> > > > >>> think about this.
> > >> > > > >>>
> > >> > > > >>>
> > >> > > > >>> >
> > >> > > > >>> > >
> > >> > > > >>> > > Thanks,
> > >> > > > >>> > > Dong
> > >> > > > >>> > >
> > >> > > > >>> > > On Tue, Jan 3, 2017 at 3:56 PM, Ewen
Cheslack-Postava <
> > >> > > > >>> ewen@confluent.io
> > >> > > > >>> > >
> > >> > > > >>> > > wrote:
> > >> > > > >>> > >
> > >> > > > >>> > > > Dong,
> > >> > > > >>> > > >
> > >> > > > >>> > > > Looks like that's an internal
link,
> > >> > > > >>> > > > https://cwiki.apache.org/confl
> > >> uence/display/KAFKA/KIP-107%
> > >> > > > >>> > > > 3A+Add+purgeDataBefore%28%29+API+in+AdminClient
> > >> > > > >>> > > > is the right one.
> > >> > > > >>> > > >
> > >> > > > >>> > > > I have a question about one
of the rejected
> > alternatives:
> > >> > > > >>> > > >
> > >> > > > >>> > > > > Using committed offset
instead of an extra API to
> > >> trigger
> > >> > > data
> > >> > > > >>> purge
> > >> > > > >>> > > > operation.
> > >> > > > >>> > > >
> > >> > > > >>> > > > The KIP says this would be more
complicated to
> > implement.
> > >> Why
> > >> > > is
> > >> > > > >>> that?
> > >> > > > >>> > I
> > >> > > > >>> > > > think brokers would have to
consume the entire offsets
> > >> topic,
> > >> > > but
> > >> > > > >>> the
> > >> > > > >>> > > data
> > >> > > > >>> > > > stored in memory doesn't seem
to change and applying
> > this
> > >> > when
> > >> > > > >>> updated
> > >> > > > >>> > > > offsets are seen seems basically
the same. It might
> also
> > >> be
> > >> > > > >>> possible to
> > >> > > > >>> > > > make it work even with multiple
consumer groups if
> that
> > >> was
> > >> > > > desired
> > >> > > > >>> > > > (although that'd require tracking
more data in memory)
> > as
> > >> a
> > >> > > > >>> > > generalization
> > >> > > > >>> > > > without requiring coordination
between the consumer
> > >> groups.
> > >> > > Given
> > >> > > > >>> the
> > >> > > > >>> > > > motivation, I'm assuming this
was considered
> unnecessary
> > >> > since
> > >> > > > this
> > >> > > > >>> > > > specifically targets intermediate
stream processing
> > >> topics.
> > >> > > > >>> > > >
> > >> > > > >>> > > > Another question is why expose
this via AdminClient
> > (which
> > >> > > isn't
> > >> > > > >>> public
> > >> > > > >>> > > API
> > >> > > > >>> > > > afaik)? Why not, for example,
expose it on the
> Consumer,
> > >> > which
> > >> > > is
> > >> > > > >>> > > > presumably where you'd want
access to it since the
> > >> > > functionality
> > >> > > > >>> > depends
> > >> > > > >>> > > on
> > >> > > > >>> > > > the consumer actually having
consumed the data?
> > >> > > > >>> > > >
> > >> > > > >>> > > > -Ewen
> > >> > > > >>> > > >
> > >> > > > >>> > > > On Tue, Jan 3, 2017 at 2:45
PM, Dong Lin <
> > >> > lindong28@gmail.com>
> > >> > > > >>> wrote:
> > >> > > > >>> > > >
> > >> > > > >>> > > > > Hi all,
> > >> > > > >>> > > > >
> > >> > > > >>> > > > > We created KIP-107 to propose
addition of
> > >> purgeDataBefore()
> > >> > > API
> > >> > > > >>> in
> > >> > > > >>> > > > > AdminClient.
> > >> > > > >>> > > > >
> > >> > > > >>> > > > > Please find the KIP wiki
in the link
> > >> > > > https://iwww.corp.linkedin.
> > >> > > > >>> > > > > com/wiki/cf/display/ENGS/Kafka
> > >> +purgeDataBefore%28%29+API+
> > >> > > > >>> > > > design+proposal.
> > >> > > > >>> > > > > We
> > >> > > > >>> > > > > would love to hear your
comments and suggestions.
> > >> > > > >>> > > > >
> > >> > > > >>> > > > > Thanks,
> > >> > > > >>> > > > > Dong
> > >> > > > >>> > > > >
> > >> > > > >>> > > >
> > >> > > > >>> > >
> > >> > > > >>> >
> > >> > > > >>>
> > >> > > > >>
> > >> > > > >>
> > >> > > > >
> > >> > > >
> > >> > >
> > >> >
> > >>
> > >>
> > >>
> > >> --
> > >> -Regards,
> > >> Mayuresh R. Gharat
> > >> (862) 250-7125
> > >>
> > >
> > >
> >
>
>
>
> --
> -Regards,
> Mayuresh R. Gharat
> (862) 250-7125
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message