kafka-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Jun Rao <...@confluent.io>
Subject Re: [DISCUSS] KIP-68 Add a consumed log retention before log retention
Date Tue, 25 Oct 2016 21:52:31 GMT
One of the main reasons for retaining messages on the broker after
consumption is to support replay. A common reason for replay is to fix and
application error. So, it seems that it's a bit hard to delete log segments
just based on the committed offsets that the broker knows. An alternative
approach is to support an api that can trim the log up to a specified
offset (similar to what's being discussed in KIP-47). This way, an
application can control when and how much to trim the log.

Thanks,

Jun

On Mon, Oct 24, 2016 at 11:11 AM, Guozhang Wang <wangguoz@gmail.com> wrote:

> Overall I think the motivation is common and of interests to lots of users.
> Would like to throw my two cents on this discussion:
>
> 1. Kafka topics can be used in different ways. For some categories of
> topics (think: "pageView" event topics), it is a shared topic among
> different teams / apps within the organization and lots of temporary
> consumers (for debugging, trouble shooting, prototype development, etc) can
> come and go dynamically, in which case it is hard to track all of such
> consumer and maintain the minimum committed offsets; on the other hand,
> there are another category of topics (think: stream-app owned intermediate
> topics like "pricing-enriched-bid-activity", as Becket mentioned above)
> which are particularly own but only one or a few apps, and hence the
> consumer groups for those topics are pre-defined and roughly static. In
> this case I think it makes sense to allow such consumer-drive log retention
> features.
>
> 2. In this case, my question is then whether this bookkeeping of
> min-committed-offsets should be done at the brokers side or it should be on
> the app side. My gut feeling is that it could be better bookkept on the app
> (i.e. client) side which has the full information of the "registered
> consumer groups" for certain topics, and then knows the
> min-committed-offsets. And a slightly-modified KIP-47 mentioned by Dong
> could a better fit, where a) app side bookkeep the consumer-driven min
> offset based on their committed offsets, by either talking to the consumer
> clients directly or query broker for the committed offsets of those
> registered consumer groups, and then b) write
> *log.retention.min.offset* periodically
> to broker to let it delete old segments before that offset (NOTE that the
> semantics is exactly the same as to KIP-47, while the only difference is
> that we use offset instead of timestamp to indicate, which can be honor by
> the same implementation of KIP-47 on broker side).
>
> My arguments for letting the app side to bookkeep such min-offsets and only
> let brokers to take requests to delete segments accordingly are 1) keeping
> the broker simple without any querying each other about such offsets and
> does the min() calculation, rather only keeping / deleting messages from
> client admin requests, and 2) allowing more generalized client-driven log
> retention policies with KIP-47 (i.e. broker is brainless and only take
> requests while client-app can apply any customized logic to determine the
> config values of *og.retention.min.offset or **og.retention.min.timestamp*
> that
> they send to the brokers).
>
>
>
> Guozhang
>
>
> On Sat, Oct 22, 2016 at 5:46 PM, Becket Qin <becket.qin@gmail.com> wrote:
>
> > Hi David,
> >
> > > 1. What scenario is used to this configuration?
> >
> > One scenario is stream processing pipeline. In a stream processing DAG,
> > there will be a bunch of intermediate result, we only care about the
> > consumer group that is in the downstream of the DAG, but not other
> groups.
> > Ideally we want to delete the log of the intermediate topics right after
> > all the downstream processing jobs has successfully processed the
> messages.
> > In that case, we only care about the downstream processing jobs, but not
> > other groups. That means if a down stream job did not commit offset for
> > some reason, we want to wait for that job. Without the predefined
> > interested group, it is hard to achieve this.
> >
> >
> > 2. Yes, the configuration should be at topic level and set dynamically.
> >
> > Thanks,
> >
> > Jiangjie (Becket) Qin
> >
> > On Fri, Oct 21, 2016 at 7:40 AM, 东方甲乙 <254479818@qq.com> wrote:
> >
> > > Hi Mayuresh,
> > >     Thanks for the reply:
> > > 1.  In the log retention check schedule, the broker first find the all
> > the
> > > consumed group which are consuming this topic, and query the commit
> > offset
> > > of this consumed group for the topic
> > > using the OffsetFetch API. And the min commit offset is the minimal
> > commit
> > > offset between these commit offsets.
> > >
> > >
> > > 2.  If the console consumer reading and commit, its commit offset will
> be
> > > used to calculate the min commit offset for this topic.
> > > We can avoid the random consumer using the method Becket suggested.
> > >
> > >
> > > 3. It will not delete the log immediately, the log will stay some time
> (
> > > retention.commitoffset.ms), and after that we only delete
> > > the log segments whose offsets are less than the min commit offset.  So
> > > the user can rewind its offset in the log.retention.ms.
> > >
> > >
> > > Thanks,
> > > David
> > >
> > >
> > >
> > >
> > > ------------------ 原始邮件 ------------------
> > > 发件人: "Mayuresh Gharat";<gharatmayuresh15@gmail.com>;
> > > 发送时间: 2016年10月19日(星期三) 上午10:25
> > > 收件人: "dev"<dev@kafka.apache.org>;
> > >
> > > 主题: Re: [DISCUSS] KIP-68 Add a consumed log retention before log
> > retention
> > >
> > >
> > >
> > > Hi David,
> > >
> > > Thanks for the KIP.
> > >
> > > I had some questions/suggestions :
> > >
> > > It would be great if you can explain with an example about how the min
> > > offset for all the consumers will be calculated, in the KIP.
> > > What I meant was, it would be great to understand with a pseudo
> > > code/workflow if possible, how each broker knows all the consumers for
> > the
> > > given topic-partition and how the min is calculated.
> > >
> > > Also it would be good to understand what happens if we start a console
> > > consumer which would actually start reading from the beginning offset
> and
> > > commit and crash immediately. How will the segments get deleted?
> > >
> > > Will it delete all the log segments if all the consumers have read till
> > > latest? If Yes, would we be able to handle a scenario were we say that
> > user
> > > can rewind its offset to reprocess the data since log.retention.ms
> might
> > > not has reached.
> > >
> > > Thanks,
> > >
> > > Mayuresh
> > >
> > > On Mon, Oct 17, 2016 at 12:27 AM, Becket Qin <becket.qin@gmail.com>
> > wrote:
> > >
> > > > Hey David,
> > > >
> > > > Thanks for replies to the questions.
> > > >
> > > > I think one major thing still not clear at this point is that whether
> > the
> > > > brokers will only apply the consumed log retention to a specific set
> of
> > > > interested consumer groups, or it does not have such a set of
> consumer
> > > > groups.
> > > >
> > > > For example, for topic T, assume we know that there will be two
> > > downstream
> > > > consumer groups CG1 and CG2 consuming data from topic T. Will we add
> a
> > > > topic configurations such as
> > > > "log.retention.commitoffset.interested.groups=CG1,CG2" to topic T so
> > > that
> > > > the brokers only care about CG1 and CG2. The committed offsets of
> other
> > > > groups are not interested and won't have any impact on the committed
> > > offset
> > > > based log retention.
> > > >
> > > > It seems the current proposal does not have an "interested consumer
> > group
> > > > set" configuration, so that means any random consumer group may
> affect
> > > the
> > > > committed offset based log retention.
> > > >
> > > > I think the committed offset based log retention seems more useful in
> > > cases
> > > > where we already know which consumer groups will be consuming from
> this
> > > > topic, so we will only wait for those consumer groups but ignore the
> > > > others. If a group will be consumed by many unknown or unpredictable
> > > > consumer groups, it seems the existing time based log retention is
> much
> > > > simple and clear enough. So I would argue we don't need to address
> the
> > > case
> > > > that some groups may come later in the committed offset based
> > retention.
> > > >
> > > > That said, there may still be value to keep the data for some time
> even
> > > > after all the interested consumer groups have consumed the messages.
> > For
> > > > example, in a pipelined stream processing DAG, we may want to keep
> the
> > > data
> > > > of an intermediate topic for some time in case the job fails. So we
> can
> > > > resume from a previously succeeded stage instead of restart the
> entire
> > > > pipeline. Or we can use the intermediate topic for some debugging
> work.
> > > >
> > > > Thanks,
> > > >
> > > > Jiangjie (Becket) Qin
> > > >
> > > >
> > > >
> > > > On Sun, Oct 16, 2016 at 2:15 AM, 东方甲乙 <254479818@qq.com>
wrote:
> > > >
> > > > > Hi Dong,
> > > > >     The KIP is used to solve both these 2 cases, we specify a small
> > > > > consumed log retention time to deleted the consumed data and avoid
> > > losing
> > > > > un-consumed data.
> > > > > And the specify a large force log retention time used as higher
> bound
> > > for
> > > > > the data.  I will update the KIP for this info.
> > > > >     Another solution I think may be ok is to support an API to
> delete
> > > the
> > > > > inactive group?  If the group is in inactive, but it's commit
> offset
> > is
> > > > > also in the __commit_offsets topic and
> > > > > stay in the offset cache,  we can delete it via this API.
> > > > >
> > > > >
> > > > > Thanks,
> > > > > David
> > > > >
> > > > >
> > > > > ------------------ 原始邮件 ------------------
> > > > > 发件人: "Dong Lin";<lindong28@gmail.com>;
> > > > > 发送时间: 2016年10月14日(星期五) 凌晨5:01
> > > > > 收件人: "dev"<dev@kafka.apache.org>;
> > > > >
> > > > > 主题: Re: [DISCUSS] KIP-68 Add a consumed log retention before
log
> > > > retention
> > > > >
> > > > >
> > > > >
> > > > > Hi David,
> > > > >
> > > > > As explained in the motivation section of the KIP, the problem is
> > that
> > > if
> > > > > log retention is too small, we may lose data; and if log retention
> is
> > > too
> > > > > large, then we waste disk space. Therefore, we need to solve one
if
> > the
> > > > two
> > > > > problems -- allow data to be persisted longer for consumption if
> log
> > > > > retention is set too small, or allow data to be expired earlier if
> > log
> > > > > retention is too large. I think the KIP probably needs to make this
> > > clear
> > > > > and explain which one is rejected and why. Note that the choice of
> > the
> > > > two
> > > > > affects the solution -- if we want to address the first problem
> then
> > > > > log.retention.ms should be used as lower bound on the actual
> > retention
> > > > > time, and if we want to address the second problem then the
> > > > > log.retention.ms
> > > > > should be used as higher bound on the actual retention time.
> > > > >
> > > > > In both cases, we probably need to figure out a way to determine
> > > "active
> > > > > consumer group". Maybe we can compare the time-since-last-commit
> > > against
> > > > a
> > > > > threshold to determine this. In addition, the threshold can be
> > > overridden
> > > > > either per-topic or per-groupId. If we go along this route, the
> > > rejected
> > > > > solution (per-topic vs. per-groupId) should probably be explained
> in
> > > the
> > > > > KIP.
> > > > >
> > > > >
> > > > > Thanks,
> > > > > Dong
> > > > >
> > > > >
> > > > >
> > > > > On Thu, Oct 13, 2016 at 10:23 AM, Dong Lin <lindong28@gmail.com>
> > > wrote:
> > > > >
> > > > > > Hi David,
> > > > > >
> > > > > > Thanks for your explanation. There still seems to be issue with
> > this
> > > > > > solution. Please see my comment inline.
> > > > > >
> > > > > >
> > > > > > On Thu, Oct 13, 2016 at 8:46 AM, 东方甲乙 <254479818@qq.com>
wrote:
> > > > > >
> > > > > >> Hi Dong,
> > > > > >>     Sorry for the delay, here are the comments:
> > > > > >> 1.I think we should distinguish these two cases:
> > > > > >> (1) group has no member, but has commit offset :  In this
case
> we
> > > > should
> > > > > >> consider its commit offset
> > > > > >> (2) group has no member, no commit offset:  Skip this group
> > > > > >> Is it ok?
> > > > > >>
> > > > > >>
> > > > > >> ListGroup API can list the groups,  but this API only show
the
> > > Online
> > > > > >> Group, so we should enhance the listGroup API to list those
> groups
> > > in
> > > > > the
> > > > > >> case (1)
> > > > > >>
> > > > > >> Say some user starts a consumer to consume topic A with
> > > > > > enable.auto.commit = true. Later they change the group name
in
> the
> > > > > config.
> > > > > > Then the proposed solution will never execute consumed log
> > retention
> > > > for
> > > > > > the topic A, right? I think group name change is pretty common
> and
> > we
> > > > > > should take care of this issue. One possible solution is to
add a
> > > > config
> > > > > to
> > > > > > specify the maximum time since last offset commit before we
> > consider
> > > a
> > > > > > group is inactive.
> > > > > >
> > > > > >
> > > > > >>
> > > > > >> 2. Because every consumer group may appear in different
time,
> say,
> > > > group
> > > > > >> 1 start to consume in day 1, group 2 start to consume in
day 2.
> > If
> > > we
> > > > > >> delete the log segment right away,
> > > > > >> group 2 can not consume these message.  So we hope the messages
> > can
> > > > hold
> > > > > >> for a specified time.  I think many use-cases will need
there
> > > configs,
> > > > > if
> > > > > >> there are many consumer groups.
> > > > > >>
> > > > > >>
> > > > > > If we want to take care of group 2, can we simply disable
> consumed
> > > log
> > > > > > retention for the topic and set log retention to 1 day? Can
you
> > > explain
> > > > > the
> > > > > > benefit of enabling consumed log retention and set consumed
log
> > > > retention
> > > > > > to 1 day?
> > > > > >
> > > > > > Currently the flow graph in the KIP suggests that we delete
data
> > iff
> > > > > > (consumed log retention is triggered OR forced log retention
is
> > > > > triggered).
> > > > > > And alternative solution is to delete data iff ( (consumed log
> > > > retention
> > > > > is
> > > > > > disabled OR consumed log retention is triggered) AND forced
log
> > > > retention
> > > > > > is triggered). I would argue that the 2nd scheme is better.
Say
> the
> > > > > > consumed log retention is enabled. The 1st scheme basically
> > > interprets
> > > > > > forced log retention as the upper bound of the time the data
can
> > stay
> > > > in
> > > > > > Kafka. The 2nd scheme interprets forced log retention as the
> lower
> > > > bound
> > > > > of
> > > > > > the time the data can stay in Kafka, which is more consistent
> with
> > > the
> > > > > > purpose of having this forced log retention (to save disk space).
> > And
> > > > if
> > > > > we
> > > > > > adopt the 2nd solution, the use-case you suggested can be easily
> > > > > addressed
> > > > > > by setting forced log retention to 1 day and enable consumed
log
> > > > > retention.
> > > > > > What do you think?
> > > > > >
> > > > > >
> > > > > >
> > > > > >>
> > > > > >> Thanks,
> > > > > >> David
> > > > > >>
> > > > > >>
> > > > > >>
> > > > > >>
> > > > > >> ------------------ 原始邮件 ------------------
> > > > > >> 发件人: "Dong Lin";<lindong28@gmail.com>;
> > > > > >> 发送时间: 2016年10月10日(星期一) 下午4:05
> > > > > >> 收件人: "dev"<dev@kafka.apache.org>;
> > > > > >>
> > > > > >> 主题: Re: [DISCUSS] KIP-68 Add a consumed log retention
before log
> > > > > retention
> > > > > >>
> > > > > >>
> > > > > >>
> > > > > >> Hey David,
> > > > > >>
> > > > > >> Thanks for reply. Please see comment inline.
> > > > > >>
> > > > > >> On Mon, Oct 10, 2016 at 12:40 AM, Pengwei (L) <
> > > pengwei.li@huawei.com>
> > > > > >> wrote:
> > > > > >>
> > > > > >> > Hi Dong
> > > > > >> >    Thanks for the questions:
> > > > > >> >
> > > > > >> > 1.  Now we don't distinguish inactive or active groups.
> Because
> > in
> > > > > some
> > > > > >> > case maybe inactive group will become active again,
and using
> > the
> > > > > >> previous
> > > > > >> > commit offset.
> > > > > >> >
> > > > > >> > So we will not delete the log segment in the consumer
> retention
> > if
> > > > > there
> > > > > >> > are some groups consume but not commit, but the log
segment
> can
> > be
> > > > > >> delete by
> > > > > >> >      the force retention.
> > > > > >> >
> > > > > >>
> > > > > >> So in the example I provided, the consumed log retention
will be
> > > > > >> effectively disabled, right? This seems to be a real problem
in
> > > > > operation
> > > > > >> -- we don't want log retention to be un-intentionally disabled
> > > simply
> > > > > >> because someone start a tool to consume from that topic.
Either
> > this
> > > > KIP
> > > > > >> should provide a way to handle this, or there should be
a way
> for
> > > > > operator
> > > > > >> to be aware of such case and be able to re-eanble consumed
log
> > > > retention
> > > > > >> for the topic. What do you think?
> > > > > >>
> > > > > >>
> > > > > >>
> > > > > >> > 2.  These configs are used to determine the out of
date time
> of
> > > the
> > > > > >> > consumed retention, like the parameters of the force
retention
> > > > > >> > (log.retention.hours, log.retention.minutes, log.retention.ms
> ).
> > > For
> > > > > >> > example, users want the save the log for 3 days, after
3 days,
> > > kafka
> > > > > >> will
> > > > > >> > delete the log segments which are
> > > > > >> >
> > > > > >> > consumed by all the consumer group.  The log retention
thread
> > need
> > > > > these
> > > > > >> > parameters.
> > > > > >> >
> > > > > >> > It makes sense to have configs such as log.retention.ms
-- it
> > is
> > > > used
> > > > > >> to
> > > > > >> make data available for up to a configured amount of time
before
> > it
> > > is
> > > > > >> deleted. My question is what is the use-case for making
log
> > > available
> > > > > for
> > > > > >> another e.g. 3 days after it has been consumed by all consumer
> > > groups.
> > > > > The
> > > > > >> purpose of this KIP is to allow log to be deleted right
as long
> as
> > > all
> > > > > >> interested consumer groups have consumed it. Can you provide
a
> > > > use-case
> > > > > >> for
> > > > > >> keeping log available for longer time after it has been
consumed
> > by
> > > > all
> > > > > >> groups?
> > > > > >>
> > > > > >>
> > > > > >> >
> > > > > >> > Thanks,
> > > > > >> > David
> > > > > >> >
> > > > > >> >
> > > > > >> > > Hey David,
> > > > > >> > >
> > > > > >> > > Thanks for the KIP. Can you help with the following
two
> > > questions:
> > > > > >> > >
> > > > > >> > > 1) If someone start a consumer (e.g. kafka-console-consumer)
> > to
> > > > > >> consume a
> > > > > >> > > topic for debug/validation purpose, a randome
consumer group
> > may
> > > > be
> > > > > >> > created
> > > > > >> > > and offset may be committed for this consumer
group. If no
> > > offset
> > > > > >> commit
> > > > > >> > is
> > > > > >> > > made for this consumer group in the future, will
this
> > > effectively
> > > > > >> > > disable consumed log retention for this topic?
In other
> words,
> > > how
> > > > > do
> > > > > >> > this
> > > > > >> > > KIP distinguish active consumer group from inactive
ones?
> > > > > >> > >
> > > > > >> > > 2) Why do we need new configs such as
> > > > log.retention.commitoffset.hou
> > > > > >> rs?
> > > > > >> > Can
> > > > > >> > >we simply delete log segments if consumed log retention
is
> > > enabled
> > > > > for
> > > > > >> > this
> > > > > >> > > topic and all consumer groups have consumed messages
in the
> > log
> > > > > >> segment?
> > > > > >> > >
> > > > > >> > > Thanks,
> > > > > >> > > Dong
> > > > > >> > >
> > > > > >> > >
> > > > > >> > >
> > > > > >> > >On Sat, Oct 8, 2016 at 2:15 AM, Pengwei (L) <
> > > pengwei.li@huawei.com
> > > > >
> > > > > >> > wrote:
> > > > > >> > >
> > > > > >> > > > Hi Becket,
> > > > > >> > > >
> > > > > >> > > >   Thanks for the feedback:
> > > > > >> > > > 1.  We use the simple consumer api to query
the commit
> > offset,
> > > > so
> > > > > we
> > > > > >> > don't
> > > > > >> > > > need to specify the consumer group.
> > > > > >> > > > 2.  Every broker using the simple consumer
> > api(OffsetFetchKey)
> > > > to
> > > > > >> query
> > > > > >> > > > the commit offset in the log retention process.
 The
> client
> > > can
> > > > > >> commit
> > > > > >> > > > offset or not.
> > > > > >> > > > 3.  It does not need to distinguish the follower
brokers
> or
> > > > leader
> > > > > >> > > > brokers,  every brokers can query.
> > > > > >> > > > 4.  We don't need to change the protocols,
we mainly
> change
> > > the
> > > > > log
> > > > > >> > > > retention process in the log manager.
> > > > > >> > > >
> > > > > >> > > >   One question is the query min offset need
O(partitions *
> > > > groups)
> > > > > >> time
> > > > > >> > > > complexity, another alternative is to build
an internal
> > topic
> > > to
> > > > > >> save
> > > > > >> > every
> > > > > >> > > > partition's min offset, it can reduce to
O(1).
> > > > > >> > > > I will update the wiki for more details.
> > > > > >> > > >
> > > > > >> > > > Thanks,
> > > > > >> > > > David
> > > > > >> > > >
> > > > > >> > > >
> > > > > >> > > > > Hi Pengwei,
> > > > > >> > > > >
> > > > > >> > > > > Thanks for the KIP proposal. It is a
very useful KIP.
> At a
> > > > high
> > > > > >> > level,
> > > > > >> > > > the
> > > > > >> > > > > proposed behavior looks reasonable to
me.
> > > > > >> > > > >
> > > > > >> > > > > However, it seems that some of the details
are not
> > mentioned
> > > > in
> > > > > >> the
> > > > > >> > KIP.
> > > > > >> > > > > For example,
> > > > > >> > > > >
> > > > > >> > > > > 1. How will the expected consumer group
be specified? Is
> > it
> > > > > >> through
> > > > > >> > a per
> > > > > >> > > > > topic dynamic configuration?
> > > > > >> > > > > 2. How do the brokers detect the consumer
offsets? Is it
> > > > > required
> > > > > >> > for a
> > > > > >> > > > > consumer to commit offsets?
> > > > > >> > > > > 3. How do all the replicas know the
about the committed
> > > > offsets?
> > > > > >> > e.g. 1)
> > > > > >> > > > > non-coordinator brokers which do not
have the committed
> > > > offsets,
> > > > > >> 2)
> > > > > >> > > > > follower brokers which do not have consumers
directly
> > > > consuming
> > > > > >> from
> > > > > >> > it.
> > > > > >> > > > > 4. Is there any other changes need to
be made (e.g. new
> > > > > >> protocols) in
> > > > > >> > > > > addition to the configuration change?
> > > > > >> > > > >
> > > > > >> > > > > It would be great if you can update
the wiki to have
> more
> > > > > details.
> > > > > >> > > > >
> > > > > >> > > > > Thanks,
> > > > > >> > > > >
> > > > > >> > > > > Jiangjie (Becket) Qin
> > > > > >> > > > >
> > > > > >> > > > > On Wed, Sep 7, 2016 at 2:26 AM, Pengwei
(L) <
> > > > > >> pengwei.li@huawei.com>
> > > > > >> > > > wrote:
> > > > > >> > > > >
> > > > > >> > > > > > Hi All,
> > > > > >> > > > > >    I have made a KIP to enhance
the log retention,
> > details
> > > > as
> > > > > >> > follows:
> > > > > >> > > > > > https://cwiki.apache.org/
> confluence/display/KAFKA/KIP-
> > > > > >> > > > > > 68+Add+a+consumed+log+retention+before+log+retention
> > > > > >> > > > > >    Now start a discuss thread for
this KIP , looking
> > > forward
> > > > > to
> > > > > >> the
> > > > > >> > > > > > feedback.
> > > > > >> > > > > >
> > > > > >> > > > > > Thanks,
> > > > > >> > > > > > David
> > > > > >> > > > > >
> > > > > >> > > > > >
> > > > > >> >
> > > > > >> >
> > > > > >>
> > > > > >
> > > > > >
> > > > >
> > > >
> > >
> > >
> > >
> > > --
> > > -Regards,
> > > Mayuresh R. Gharat
> > > (862) 250-7125
> > >
> >
>
>
>
> --
> -- Guozhang
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message