kafka-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Dong Lin <lindon...@gmail.com>
Subject Re: [DISCUSS] KIP-107: Add purgeDataBefore() API in AdminClient
Date Sat, 07 Jan 2017 07:23:17 GMT
Hey Mayuresh,

Thanks for the comment. If the message's offset is below low_watermark,
then it should have been deleted by log retention policy. Thus it is OK not
to expose this message to consumer. Does this answer your question?

Thanks,
Dong

On Fri, Jan 6, 2017 at 4:21 PM, Mayuresh Gharat <gharatmayuresh15@gmail.com>
wrote:

> Hi Dong,
>
> Thanks for the KIP.
>
> I had a question (which might have been answered before).
>
> 1) The KIP says that the low_water_mark will be updated periodically by the
> broker like high_water_mark.
> Essentially we want to use low_water_mark for cases where an entire segment
> cannot be deleted because may be the segment_start_offset < PurgeOffset <
> segment_end_offset, in which case we will set the low_water_mark to
> PurgeOffset+1.
>
> 2) The KIP also says that messages below low_water_mark will not be exposed
> for consumers, which does make sense since we want say that data below
> low_water_mark is purged.
>
> Looking at above conditions, does it make sense not to update the
> low_water_mark periodically but only on PurgeRequest?
> The reason being, if we update it periodically then as per 2) we will not
> be allowing consumers to re-consume data that is not purged but is below
> low_water_mark.
>
> Thanks,
>
> Mayuresh
>
>
> On Fri, Jan 6, 2017 at 11:18 AM, Dong Lin <lindong28@gmail.com> wrote:
>
> > Hey Jun,
> >
> > Thanks for reviewing the KIP!
> >
> > 1. The low_watermark will be checkpointed in a new file named
> >  "replication-low-watermark-checkpoint". It will have the same format as
> > the existing replication-offset-checkpoint file. This allows us the keep
> > the existing format of checkpoint files which maps TopicPartition to
> Long.
> > I just updated the "Public Interface" section in the KIP wiki to explain
> > this file.
> >
> > 2. I think using low_watermark from leader to trigger log retention in
> the
> > follower will work correctly in the sense that all messages with offset <
> > low_watermark can be deleted. But I am not sure that the efficiency is
> the
> > same, i.e. offset of messages which should be deleted (i.e. due to time
> or
> > size-based log retention policy) will be smaller than low_watermark from
> > the leader.
> >
> > For example, say both the follower and the leader have messages with
> > offsets in range [0, 2000]. If the follower does log rolling slightly
> later
> > than leader, the segments on follower would be [0, 1001], [1002, 2000]
> and
> > segments on leader would be [0, 1000], [1001, 2000]. After leader deletes
> > the first segment, the low_watermark would be 1001. Thus the first
> segment
> > would stay on follower's disk unnecessarily which may double disk usage
> at
> > worst.
> >
> > Since this approach doesn't save us much, I am inclined to not include
> this
> > change to keep the KIP simple.
> >
> > Dong
> >
> >
> >
> > On Fri, Jan 6, 2017 at 10:05 AM, Jun Rao <jun@confluent.io> wrote:
> >
> > > Hi, Dong,
> > >
> > > Thanks for the proposal. Looks good overall. A couple of comments.
> > >
> > > 1. Where is the low_watermark checkpointed? Is that
> > > in replication-offset-checkpoint? If so, do we need to bump up the
> > version?
> > > Could you also describe the format change?
> > >
> > > 2. For topics with "delete" retention, currently we let each replica
> > delete
> > > old segments independently. With low_watermark, we could just let
> leaders
> > > delete old segments through the deletion policy and the followers will
> > > simply delete old segments based on low_watermark. Not sure if this
> saves
> > > much, but is a potential option that may be worth thinking about.
> > >
> > > Jun
> > >
> > >
> > >
> > > On Wed, Jan 4, 2017 at 8:13 AM, radai <radai.rosenblatt@gmail.com>
> > wrote:
> > >
> > > > one more example of complicated config - mirror maker.
> > > >
> > > > we definitely cant trust each and every topic owner to configure
> their
> > > > topics not to purge before they've been mirrored.
> > > > which would mean there's a per-topic config (set by the owner) and a
> > > > "global" config (where mirror makers are specified) and they need to
> be
> > > > "merged".
> > > > for those topics that _are_ mirrored.
> > > > which is a changing set of topics thats stored in an external system
> > > > outside of kafka.
> > > > if a topic is taken out of the mirror set the MM offset would be
> > "frozen"
> > > > at that point and prevent clean-up for all eternity, unless its
> > > cleaned-up
> > > > itself.
> > > >
> > > > ...
> > > >
> > > > complexity :-)
> > > >
> > > > On Wed, Jan 4, 2017 at 8:04 AM, radai <radai.rosenblatt@gmail.com>
> > > wrote:
> > > >
> > > > > in summary - i'm not opposed to the idea of a per-topic clean up
> > config
> > > > > that tracks some set of consumer groups' offsets (which would
> > probably
> > > > work
> > > > > for 80% of use cases), but i definitely see a need to expose a
> simple
> > > API
> > > > > for the more advanced/obscure/custom use cases (the other 20%).
> > > > >
> > > > > On Wed, Jan 4, 2017 at 7:54 AM, radai <radai.rosenblatt@gmail.com>
> > > > wrote:
> > > > >
> > > > >> a major motivation for this KIP is cost savings.
> > > > >>
> > > > >> lots of internal systems at LI use kafka as an intermediate pipe,
> > and
> > > > set
> > > > >> the topic retention period to a "safe enough" amount of time
to be
> > > able
> > > > to
> > > > >> recover from crashes/downtime and catch up to "now". this results
> > in a
> > > > few
> > > > >> days' worth of retention typically.
> > > > >>
> > > > >> however, under normal operating conditions the consumers are
> mostly
> > > > >> caught-up and so early clean-up enables a big cost savings in
> > storage.
> > > > >>
> > > > >> as for my points:
> > > > >>
> > > > >> 1. when discussing implementation options for automatic clean-up
> we
> > > > >> realized that cleaning up by keeping track of offsets stored
in
> > kafka
> > > > >> requires some per-topic config - you need to specify which groups
> to
> > > > track.
> > > > >> this becomes a problem because:
> > > > >>     1.1 - relatively complicated code, to be written in the
> broker.
> > > > >>     1.2 - configuration needs to be maintained up to date by
topic
> > > > >> "owners" - of which we have thousands. failure to do so would
> > decrease
> > > > the
> > > > >> cost benefit.
> > > > >>     1.3 - some applications have a "reconsume" / "reinit" /
> > > "bootstrap"
> > > > >> workflow where they will reset their offsets to an earlier value
> > than
> > > > the
> > > > >> one stored. this means that a stored offset of X does not always
> > mean
> > > > you
> > > > >> can clean up to X-1. think of it as video encoding -some apps
have
> > > "key
> > > > >> frames" they may seek back to which are before their current
> offset.
> > > > >>     1.4 - there are multiple possible strategies - you could
clean
> > up
> > > > >> aggressively, retain some "time distance" from latest, some
> "offset
> > > > >> distance", etc. this we think would have made it very hard to
> agree
> > > on a
> > > > >> single "correct" implementation that everyone would be happy
with.
> > it
> > > > would
> > > > >> be better to include the raw functionality in the API and leave
> the
> > > > >> "brains" to an external monitoring system where people could
> > > > custom-taylor
> > > > >> their logic
> > > > >>
> > > > >> 2. ad-hoc consumer groups: its common practice for devs to spin
up
> > > > >> console consumers and connect to a topic as a debug aid. SREs
may
> > also
> > > > do
> > > > >> this. there are also various other eco-system applications that
> may
> > > > >> consumer from topics (unknown to topic owners as those are infra
> > > > monitoring
> > > > >> tools). obviously such consumer-groups' offsets should be ignored
> > for
> > > > >> purposes of clean-up, but coming up with a bullet-proof way to
do
> > this
> > > > is
> > > > >> non-trivial and again ties with implementation complexity and
> > > > inflexibility
> > > > >> of a "one size fits all" solution in 1.4 above.
> > > > >>
> > > > >> 3. forceful clean-up: we have workflows that use kafka to move
> > > gigantic
> > > > >> blobs from offline hadoop processing flows into systems. the
data
> > > being
> > > > >> "loaded" into such an online system can be several GBs in side
and
> > > take
> > > > a
> > > > >> long time to consume (they are sliced into many small msgs).
> > sometimes
> > > > the
> > > > >> sender wants to abort and start a new blob before the current
load
> > > > process
> > > > >> has completed - meaning the consumer's offsets are not yet caught
> > up.
> > > > >>
> > > > >> 4. offsets outside of kafka: yes, you could force applications
to
> > > store
> > > > >> their offsets twice, but thats inefficient. its better to expose
a
> > > raw,
> > > > >> simple API and let such applications manage their own clean-up
> logic
> > > > (this
> > > > >> again ties into 1.4 and no "one size fits all" solution)
> > > > >>
> > > > >> On Tue, Jan 3, 2017 at 11:49 PM, Dong Lin <lindong28@gmail.com>
> > > wrote:
> > > > >>
> > > > >>> On Tue, Jan 3, 2017 at 11:01 PM, Ewen Cheslack-Postava <
> > > > >>> ewen@confluent.io>
> > > > >>> wrote:
> > > > >>>
> > > > >>> > On Tue, Jan 3, 2017 at 6:14 PM, Dong Lin <lindong28@gmail.com>
> > > > wrote:
> > > > >>> >
> > > > >>> > > Hey Ewen,
> > > > >>> > >
> > > > >>> > > Thanks for the review. As Radai explained, it would
be
> complex
> > in
> > > > >>> terms
> > > > >>> > of
> > > > >>> > > user configuration if we were to use committed
offset to
> decide
> > > > data
> > > > >>> > > deletion. We need a way to specify which groups
need to
> consume
> > > > data
> > > > >>> of
> > > > >>> > > this partition. The broker will also need to consume
the
> entire
> > > > >>> offsets
> > > > >>> > > topic in that approach which has some overhead.
I don't think
> > it
> > > is
> > > > >>> that
> > > > >>> > > hard to implement. But it will likely take more
time to
> discuss
> > > > that
> > > > >>> > > approach due to the new config and the server side
overhead.
> > > > >>> > >
> > > > >>> > > We choose to put this API in AdminClient because
the API is
> > more
> > > > >>> like an
> > > > >>> > > administrative operation (such as listGroups, deleteTopics)
> > than
> > > a
> > > > >>> > consumer
> > > > >>> > > operation. It is not necessarily called by consumer
only. For
> > > > >>> example, we
> > > > >>> > > can implement the "delete data before committed
offset"
> > approach
> > > by
> > > > >>> > running
> > > > >>> > > an external service which calls purgeDataBefore()
API based
> on
> > > > >>> committed
> > > > >>> > > offset of consumer groups.
> > > > >>> > >
> > > > >>> > > I am not aware that AdminClient is not a public
API. Suppose
> it
> > > is
> > > > >>> not
> > > > >>> > > public now, I assume we plan to make it public
in the future
> as
> > > > part
> > > > >>> of
> > > > >>> > > KIP-4. Are we not making it public because its
interface is
> not
> > > > >>> stable?
> > > > >>> > If
> > > > >>> > > so, can we just tag this new API as not stable
in the code?
> > > > >>> > >
> > > > >>> >
> > > > >>> >
> > > > >>> > The AdminClient planned for KIP-4 is a new Java-based
> > > implementation.
> > > > >>> It's
> > > > >>> > definitely confusing that both will be (could be?) named
> > > AdminClient,
> > > > >>> but
> > > > >>> > we've kept the existing Scala AdminClient out of the
public API
> > and
> > > > >>> have
> > > > >>> > not required KIPs for changes to it.
> > > > >>> >
> > > > >>> > That said, I agree that if this type of API makes it
into
> Kafka,
> > > > >>> having a
> > > > >>> > (new, Java-based) AdminClient method would definitely
be a good
> > > idea.
> > > > >>> An
> > > > >>> > alternative path might be to have a Consumer-based
> implementation
> > > > since
> > > > >>> > that seems like a very intuitive, natural way to use
the
> > protocol.
> > > I
> > > > >>> think
> > > > >>> > optimizing for the expected use case would be a good
idea.
> > > > >>> >
> > > > >>> > -Ewen
> > > > >>> >
> > > > >>> > Are you saying that the Scala AdminClient is not a public
API
> and
> > > we
> > > > >>> discourage addition of any new feature to this class?
> > > > >>>
> > > > >>> I still prefer to add it to AdminClient (Java version in
the
> future
> > > and
> > > > >>> Scala version in the short team) because I feel it belongs
to
> admin
> > > > >>> operation instead of KafkaConsumer interface. For example,
if in
> > the
> > > > >>> future
> > > > >>> we implement the "delete data before committed offset" strategy
> in
> > an
> > > > >>> external service, I feel it is a bit awkward if the service
has
> to
> > > > >>> instantiate a KafkaConsumer and call
> KafkaConsumer.purgeDataBefore(
> > > > ...)
> > > > >>> to
> > > > >>> purge data. In other words, our expected use-case doesn't
> > necessarily
> > > > >>> bind
> > > > >>> this API with consumer.
> > > > >>>
> > > > >>> I am not strong on this issue. Let's see what other
> > > > committers/developers
> > > > >>> think about this.
> > > > >>>
> > > > >>>
> > > > >>> >
> > > > >>> > >
> > > > >>> > > Thanks,
> > > > >>> > > Dong
> > > > >>> > >
> > > > >>> > > On Tue, Jan 3, 2017 at 3:56 PM, Ewen Cheslack-Postava
<
> > > > >>> ewen@confluent.io
> > > > >>> > >
> > > > >>> > > wrote:
> > > > >>> > >
> > > > >>> > > > Dong,
> > > > >>> > > >
> > > > >>> > > > Looks like that's an internal link,
> > > > >>> > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-107%
> > > > >>> > > > 3A+Add+purgeDataBefore%28%29+API+in+AdminClient
> > > > >>> > > > is the right one.
> > > > >>> > > >
> > > > >>> > > > I have a question about one of the rejected
alternatives:
> > > > >>> > > >
> > > > >>> > > > > Using committed offset instead of an
extra API to trigger
> > > data
> > > > >>> purge
> > > > >>> > > > operation.
> > > > >>> > > >
> > > > >>> > > > The KIP says this would be more complicated
to implement.
> Why
> > > is
> > > > >>> that?
> > > > >>> > I
> > > > >>> > > > think brokers would have to consume the entire
offsets
> topic,
> > > but
> > > > >>> the
> > > > >>> > > data
> > > > >>> > > > stored in memory doesn't seem to change and
applying this
> > when
> > > > >>> updated
> > > > >>> > > > offsets are seen seems basically the same.
It might also be
> > > > >>> possible to
> > > > >>> > > > make it work even with multiple consumer groups
if that was
> > > > desired
> > > > >>> > > > (although that'd require tracking more data
in memory) as a
> > > > >>> > > generalization
> > > > >>> > > > without requiring coordination between the
consumer groups.
> > > Given
> > > > >>> the
> > > > >>> > > > motivation, I'm assuming this was considered
unnecessary
> > since
> > > > this
> > > > >>> > > > specifically targets intermediate stream processing
topics.
> > > > >>> > > >
> > > > >>> > > > Another question is why expose this via AdminClient
(which
> > > isn't
> > > > >>> public
> > > > >>> > > API
> > > > >>> > > > afaik)? Why not, for example, expose it on
the Consumer,
> > which
> > > is
> > > > >>> > > > presumably where you'd want access to it since
the
> > > functionality
> > > > >>> > depends
> > > > >>> > > on
> > > > >>> > > > the consumer actually having consumed the
data?
> > > > >>> > > >
> > > > >>> > > > -Ewen
> > > > >>> > > >
> > > > >>> > > > On Tue, Jan 3, 2017 at 2:45 PM, Dong Lin <
> > lindong28@gmail.com>
> > > > >>> wrote:
> > > > >>> > > >
> > > > >>> > > > > Hi all,
> > > > >>> > > > >
> > > > >>> > > > > We created KIP-107 to propose addition
of
> purgeDataBefore()
> > > API
> > > > >>> in
> > > > >>> > > > > AdminClient.
> > > > >>> > > > >
> > > > >>> > > > > Please find the KIP wiki in the link
> > > > https://iwww.corp.linkedin.
> > > > >>> > > > > com/wiki/cf/display/ENGS/Kafka+purgeDataBefore%28%29+
> API+
> > > > >>> > > > design+proposal.
> > > > >>> > > > > We
> > > > >>> > > > > would love to hear your comments and
suggestions.
> > > > >>> > > > >
> > > > >>> > > > > Thanks,
> > > > >>> > > > > Dong
> > > > >>> > > > >
> > > > >>> > > >
> > > > >>> > >
> > > > >>> >
> > > > >>>
> > > > >>
> > > > >>
> > > > >
> > > >
> > >
> >
>
>
>
> --
> -Regards,
> Mayuresh R. Gharat
> (862) 250-7125
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message