kafka-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Dong Lin <lindon...@gmail.com>
Subject Re: [DISCUSS] KIP-232: Detect outdated metadata by adding ControllerMetadataEpoch field
Date Tue, 09 Jan 2018 00:22:31 GMT
Hey Jason,

Certainly. This sounds good. I have updated the KIP to clarity that the
global epoch will be incremented by 1 each time a topic is deleted.

Thanks,
Dong

On Mon, Jan 8, 2018 at 4:09 PM, Jason Gustafson <jason@confluent.io> wrote:

> Hi Dong,
>
>
> I think your approach will allow user to distinguish between the metadata
> > before and after the topic deletion. I also agree that this can be
> > potentially be useful to user. I am just not very sure whether we already
> > have a good use-case to make the additional complexity worthwhile. It
> seems
> > that this feature is kind of independent of the main problem of this KIP.
> > Could we add this as a future work?
>
>
> Do you think it's fair if we bump the topic epoch on deletion and leave
> propagation of the epoch for deleted topics for future work? I don't think
> this adds much complexity and it makes the behavior consistent: every topic
> mutation results in an epoch bump.
>
> Thanks,
> Jason
>
> On Mon, Jan 8, 2018 at 3:14 PM, Dong Lin <lindong28@gmail.com> wrote:
>
> > Hey Ismael,
> >
> > I guess we actually need user to see this field so that user can store
> this
> > value in the external store together with the offset. We just prefer the
> > value to be opaque to discourage most users from interpreting this value.
> > One more advantage of using such an opaque field is to be able to evolve
> > the information (or schema) of this value without changing consumer API
> in
> > the future.
> >
> > I also thinking it is probably OK for user to be able to interpret this
> > value, particularly for those advanced users.
> >
> > Thanks,
> > Dong
> >
> > On Mon, Jan 8, 2018 at 2:34 PM, Ismael Juma <ismael@juma.me.uk> wrote:
> >
> > > On Fri, Jan 5, 2018 at 7:15 PM, Jason Gustafson <jason@confluent.io>
> > > wrote:
> > > >
> > > > class OffsetAndMetadata {
> > > >   long offset;
> > > >   byte[] offsetMetadata;
> > > >   String metadata;
> > > > }
> > >
> > >
> > > > Admittedly, the naming is a bit annoying, but we can probably come up
> > > with
> > > > something better. Internally the byte array would have a version. If
> in
> > > the
> > > > future we have anything else we need to add, we can update the
> version
> > > and
> > > > we wouldn't need any new APIs.
> > > >
> > >
> > > We can also add fields to a class in a compatible way. So, it seems to
> me
> > > that the main advantage of the byte array is that it's opaque to the
> > user.
> > > Is that correct? If so, we could also add any opaque metadata in a
> > subclass
> > > so that users don't even see it (unless they cast it, but then they're
> on
> > > their own).
> > >
> > > Ismael
> > >
> > > The corresponding seek() and position() APIs might look something like
> > > this:
> > > >
> > > > void seek(TopicPartition partition, long offset, byte[]
> > offsetMetadata);
> > > > byte[] positionMetadata(TopicPartition partition);
> > > >
> > > > What do you think?
> > > >
> > > > Thanks,
> > > > Jason
> > > >
> > > > On Thu, Jan 4, 2018 at 7:04 PM, Dong Lin <lindong28@gmail.com>
> wrote:
> > > >
> > > > > Hey Jun, Jason,
> > > > >
> > > > > Thanks much for all the feedback. I have updated the KIP based on
> the
> > > > > latest discussion. Can you help check whether it looks good?
> > > > >
> > > > > Thanks,
> > > > > Dong
> > > > >
> > > > > On Thu, Jan 4, 2018 at 5:36 PM, Dong Lin <lindong28@gmail.com>
> > wrote:
> > > > >
> > > > > > Hey Jun,
> > > > > >
> > > > > > Hmm... thinking about this more, I am not sure that the proposed
> > API
> > > is
> > > > > > sufficient. For users that store offset externally, we probably
> > need
> > > > > extra
> > > > > > API to return the leader_epoch and partition_epoch for all
> > partitions
> > > > > that
> > > > > > consumers are consuming. I suppose these users currently use
> > > position()
> > > > > to
> > > > > > get the offset. Thus we probably need a new method
> > > > positionWithEpoch(..)
> > > > > to
> > > > > > return <offset, partition_epoch, leader_epoch>. Does this
sound
> > > > > reasonable?
> > > > > >
> > > > > > Thanks,
> > > > > > Dong
> > > > > >
> > > > > >
> > > > > > On Thu, Jan 4, 2018 at 5:26 PM, Jun Rao <jun@confluent.io>
> wrote:
> > > > > >
> > > > > >> Hi, Dong,
> > > > > >>
> > > > > >> Yes, that's what I am thinking. OffsetEpoch will be composed
of
> > > > > >> (partition_epoch,
> > > > > >> leader_epoch).
> > > > > >>
> > > > > >> Thanks,
> > > > > >>
> > > > > >> Jun
> > > > > >>
> > > > > >>
> > > > > >> On Thu, Jan 4, 2018 at 4:22 PM, Dong Lin <lindong28@gmail.com>
> > > wrote:
> > > > > >>
> > > > > >> > Hey Jun,
> > > > > >> >
> > > > > >> > Thanks much. I like the the new API that you proposed.
I am
> not
> > > sure
> > > > > >> what
> > > > > >> > you exactly mean by offset_epoch. I suppose that we
can use
> the
> > > pair
> > > > > of
> > > > > >> > (partition_epoch, leader_epoch) as the offset_epoch,
right?
> > > > > >> >
> > > > > >> > Thanks,
> > > > > >> > Dong
> > > > > >> >
> > > > > >> > On Thu, Jan 4, 2018 at 4:02 PM, Jun Rao <jun@confluent.io>
> > wrote:
> > > > > >> >
> > > > > >> > > Hi, Dong,
> > > > > >> > >
> > > > > >> > > Got it. The api that you proposed works. The question
is
> > whether
> > > > > >> that's
> > > > > >> > the
> > > > > >> > > api that we want to have in the long term. My
concern is
> that
> > > > while
> > > > > >> the
> > > > > >> > api
> > > > > >> > > change is simple, the new api seems harder to
explain and
> use.
> > > For
> > > > > >> > example,
> > > > > >> > > a consumer storing offsets externally now needs
to call
> > > > > >> > > waitForMetadataUpdate() after calling seek().
> > > > > >> > >
> > > > > >> > > An alternative approach is to make the following
compatible
> > api
> > > > > >> changes
> > > > > >> > in
> > > > > >> > > Consumer.
> > > > > >> > > * Add an additional OffsetEpoch field in OffsetAndMetadata.
> > (no
> > > > need
> > > > > >> to
> > > > > >> > > change the CommitSync() api)
> > > > > >> > > * Add a new api seek(TopicPartition partition,
long offset,
> > > > > >> OffsetEpoch
> > > > > >> > > offsetEpoch). We can potentially deprecate the
old api
> > > > > >> > seek(TopicPartition
> > > > > >> > > partition, long offset) in the future.
> > > > > >> > >
> > > > > >> > > The alternative approach has similar amount of
api changes
> as
> > > > yours
> > > > > >> but
> > > > > >> > has
> > > > > >> > > the following benefits.
> > > > > >> > > 1. The api works in a similar way as how offset
management
> > works
> > > > now
> > > > > >> and
> > > > > >> > is
> > > > > >> > > probably what we want in the long term.
> > > > > >> > > 2. It can reset offsets better when there is data
loss due
> to
> > > > > unclean
> > > > > >> > > leader election or correlated replica failure.
> > > > > >> > > 3. It can reset offsets better when topic is recreated.
> > > > > >> > >
> > > > > >> > > Thanks,
> > > > > >> > >
> > > > > >> > > Jun
> > > > > >> > >
> > > > > >> > >
> > > > > >> > > On Thu, Jan 4, 2018 at 2:05 PM, Dong Lin <
> lindong28@gmail.com
> > >
> > > > > wrote:
> > > > > >> > >
> > > > > >> > > > Hey Jun,
> > > > > >> > > >
> > > > > >> > > > Yeah I agree that ideally we don't want an
ever growing
> > global
> > > > > >> metadata
> > > > > >> > > > version. I just think it may be more desirable
to keep the
> > > > > consumer
> > > > > >> API
> > > > > >> > > > simple.
> > > > > >> > > >
> > > > > >> > > > In my current proposal, metadata version
returned in the
> > fetch
> > > > > >> response
> > > > > >> > > > will be stored with the offset together.
More
> specifically,
> > > the
> > > > > >> > > > metadata_epoch in the new offset topic schema
will be the
> > > > largest
> > > > > >> > > > metadata_epoch from all the MetadataResponse
and
> > FetchResponse
> > > > > ever
> > > > > >> > > > received by this consumer.
> > > > > >> > > >
> > > > > >> > > > We probably don't have to change the consumer
API for
> > > > > >> > > > commitSync(Map<TopicPartition, OffsetAndMetadata>).
If
> user
> > > > calls
> > > > > >> > > > commitSync(...) to commit offset 10 for a
given partition,
> > for
> > > > > most
> > > > > >> > > > use-cases, this consumer instance should
have consumed
> > message
> > > > > with
> > > > > >> > > offset
> > > > > >> > > > 9 from this partition, in which case the
consumer can
> > remember
> > > > and
> > > > > >> use
> > > > > >> > > the
> > > > > >> > > > metadata_epoch from the corresponding FetchResponse
when
> > > > > committing
> > > > > >> > > offset.
> > > > > >> > > > If user calls commitSync(..) to commit offset
10 for a
> given
> > > > > >> partition
> > > > > >> > > > without having consumed the message with
offset 9 using
> this
> > > > > >> consumer
> > > > > >> > > > instance, this is probably an advanced use-case.
In this
> > case
> > > > the
> > > > > >> > > advanced
> > > > > >> > > > user can retrieve the metadata_epoch using
the newly added
> > > > > >> > > metadataEpoch()
> > > > > >> > > > API after it fetches the message with offset
9 (probably
> > from
> > > > > >> another
> > > > > >> > > > consumer instance) and encode this metadata_epoch
in the
> > > > > >> > > > string OffsetAndMetadata.metadata. Do you
think this
> > solution
> > > > > would
> > > > > >> > work?
> > > > > >> > > >
> > > > > >> > > > By "not sure that I fully understand your
latest
> > suggestion",
> > > > are
> > > > > >> you
> > > > > >> > > > referring to solution related to unclean
leader election
> > using
> > > > > >> > > leader_epoch
> > > > > >> > > > in my previous email?
> > > > > >> > > >
> > > > > >> > > > Thanks,
> > > > > >> > > > Dong
> > > > > >> > > >
> > > > > >> > > > On Thu, Jan 4, 2018 at 1:33 PM, Jun Rao <jun@confluent.io
> >
> > > > wrote:
> > > > > >> > > >
> > > > > >> > > > > Hi, Dong,
> > > > > >> > > > >
> > > > > >> > > > > Not sure that I fully understand your
latest suggestion.
> > > > > >> Returning an
> > > > > >> > > > ever
> > > > > >> > > > > growing global metadata version itself
is no ideal, but
> is
> > > > fine.
> > > > > >> My
> > > > > >> > > > > question is whether the metadata version
returned in the
> > > fetch
> > > > > >> > response
> > > > > >> > > > > needs to be stored with the offset together
if offsets
> are
> > > > > stored
> > > > > >> > > > > externally. If so, we also have to change
the consumer
> API
> > > for
> > > > > >> > > > commitSync()
> > > > > >> > > > > and need to worry about compatibility.
If we don't store
> > the
> > > > > >> metadata
> > > > > >> > > > > version together with the offset, on
a consumer restart,
> > > it's
> > > > > not
> > > > > >> > clear
> > > > > >> > > > how
> > > > > >> > > > > we can ensure the metadata in the consumer
is high
> enough
> > > > since
> > > > > >> there
> > > > > >> > > is
> > > > > >> > > > no
> > > > > >> > > > > metadata version to compare with.
> > > > > >> > > > >
> > > > > >> > > > > Thanks,
> > > > > >> > > > >
> > > > > >> > > > > Jun
> > > > > >> > > > >
> > > > > >> > > > >
> > > > > >> > > > > On Wed, Jan 3, 2018 at 6:43 PM, Dong
Lin <
> > > lindong28@gmail.com
> > > > >
> > > > > >> > wrote:
> > > > > >> > > > >
> > > > > >> > > > > > Hey Jun,
> > > > > >> > > > > >
> > > > > >> > > > > > Thanks much for the explanation.
> > > > > >> > > > > >
> > > > > >> > > > > > I understand the advantage of partition_epoch
over
> > > > > >> metadata_epoch.
> > > > > >> > My
> > > > > >> > > > > > current concern is that the use
of leader_epoch and
> the
> > > > > >> > > partition_epoch
> > > > > >> > > > > > requires us considerable change
to consumer's public
> API
> > > to
> > > > > take
> > > > > >> > care
> > > > > >> > > > of
> > > > > >> > > > > > the case where user stores offset
externally. For
> > example,
> > > > > >> > > *consumer*.
> > > > > >> > > > > > *commitSync*(..) would have to
take a map whose value
> is
> > > > > >> <offset,
> > > > > >> > > > > metadata,
> > > > > >> > > > > > leader epoch, partition epoch>.
*consumer*.*seek*(...)
> > > would
> > > > > >> also
> > > > > >> > > need
> > > > > >> > > > > > leader_epoch and partition_epoch
as parameter.
> > Technically
> > > > we
> > > > > >> can
> > > > > >> > > > > probably
> > > > > >> > > > > > still make it work in a backward
compatible manner
> after
> > > > > careful
> > > > > >> > > design
> > > > > >> > > > > and
> > > > > >> > > > > > discussion. But these changes can
make the consumer's
> > > > > interface
> > > > > >> > > > > > unnecessarily complex for more
users who do not store
> > > offset
> > > > > >> > > > externally.
> > > > > >> > > > > >
> > > > > >> > > > > > After thinking more about it, we
can address all
> > problems
> > > > > >> discussed
> > > > > >> > > by
> > > > > >> > > > > only
> > > > > >> > > > > > using the metadata_epoch without
introducing
> > leader_epoch
> > > or
> > > > > the
> > > > > >> > > > > > partition_epoch. The current KIP
describes the changes
> > to
> > > > the
> > > > > >> > > consumer
> > > > > >> > > > > API
> > > > > >> > > > > > and how the new API can be used
if user stores offset
> > > > > >> externally.
> > > > > >> > In
> > > > > >> > > > > order
> > > > > >> > > > > > to address the scenario you described
earlier, we can
> > > > include
> > > > > >> > > > > > metadata_epoch in the FetchResponse
and the
> > > > > LeaderAndIsrRequest.
> > > > > >> > > > Consumer
> > > > > >> > > > > > remembers the largest metadata_epoch
from all the
> > > > > FetchResponse
> > > > > >> it
> > > > > >> > > has
> > > > > >> > > > > > received. The metadata_epoch committed
with the
> offset,
> > > > either
> > > > > >> > within
> > > > > >> > > > or
> > > > > >> > > > > > outside Kafka, should be the largest
metadata_epoch
> > across
> > > > all
> > > > > >> > > > > > FetchResponse and MetadataResponse
ever received by
> this
> > > > > >> consumer.
> > > > > >> > > > > >
> > > > > >> > > > > > The drawback of using only the
metadata_epoch is that
> we
> > > can
> > > > > not
> > > > > >> > > always
> > > > > >> > > > > do
> > > > > >> > > > > > the smart offset reset in case
of unclean leader
> > election
> > > > > which
> > > > > >> you
> > > > > >> > > > > > mentioned earlier. But in most
case, unclean leader
> > > election
> > > > > >> > probably
> > > > > >> > > > > > happens when consumer is not rebalancing/restarting.
> In
> > > > these
> > > > > >> > cases,
> > > > > >> > > > > either
> > > > > >> > > > > > consumer is not directly affected
by unclean leader
> > > election
> > > > > >> since
> > > > > >> > it
> > > > > >> > > > is
> > > > > >> > > > > > not consuming from the end of the
log, or consumer can
> > > > derive
> > > > > >> the
> > > > > >> > > > > > leader_epoch from the most recent
message received
> > before
> > > it
> > > > > >> sees
> > > > > >> > > > > > OffsetOutOfRangeException. So I
am not sure it is
> worth
> > > > adding
> > > > > >> the
> > > > > >> > > > > > leader_epoch to consumer API to
address the remaining
> > > corner
> > > > > >> case.
> > > > > >> > > What
> > > > > >> > > > > do
> > > > > >> > > > > > you think?
> > > > > >> > > > > >
> > > > > >> > > > > > Thanks,
> > > > > >> > > > > > Dong
> > > > > >> > > > > >
> > > > > >> > > > > >
> > > > > >> > > > > >
> > > > > >> > > > > > On Tue, Jan 2, 2018 at 6:28 PM,
Jun Rao <
> > jun@confluent.io
> > > >
> > > > > >> wrote:
> > > > > >> > > > > >
> > > > > >> > > > > > > Hi, Dong,
> > > > > >> > > > > > >
> > > > > >> > > > > > > Thanks for the reply.
> > > > > >> > > > > > >
> > > > > >> > > > > > > To solve the topic recreation
issue, we could use
> > > either a
> > > > > >> global
> > > > > >> > > > > > metadata
> > > > > >> > > > > > > version or a partition level
epoch. But either one
> > will
> > > > be a
> > > > > >> new
> > > > > >> > > > > concept,
> > > > > >> > > > > > > right? To me, the latter seems
more natural. It also
> > > makes
> > > > > it
> > > > > >> > > easier
> > > > > >> > > > to
> > > > > >> > > > > > > detect if a consumer's offset
is still valid after a
> > > topic
> > > > > is
> > > > > >> > > > > recreated.
> > > > > >> > > > > > As
> > > > > >> > > > > > > you pointed out, we don't
need to store the
> partition
> > > > epoch
> > > > > in
> > > > > >> > the
> > > > > >> > > > > > message.
> > > > > >> > > > > > > The following is what I am
thinking. When a
> partition
> > is
> > > > > >> created,
> > > > > >> > > we
> > > > > >> > > > > can
> > > > > >> > > > > > > assign a partition epoch from
an ever-increasing
> > global
> > > > > >> counter
> > > > > >> > and
> > > > > >> > > > > store
> > > > > >> > > > > > > it in /brokers/topics/[topic]/
> > partitions/[partitionId]
> > > in
> > > > > ZK.
> > > > > >> > The
> > > > > >> > > > > > > partition
> > > > > >> > > > > > > epoch is propagated to every
broker. The consumer
> will
> > > be
> > > > > >> > tracking
> > > > > >> > > a
> > > > > >> > > > > > tuple
> > > > > >> > > > > > > of <offset, leader epoch,
partition epoch> for
> > offsets.
> > > > If a
> > > > > >> > topic
> > > > > >> > > is
> > > > > >> > > > > > > recreated, it's possible that
a consumer's offset
> and
> > > > leader
> > > > > >> > epoch
> > > > > >> > > > > still
> > > > > >> > > > > > > match that in the broker,
but partition epoch won't
> > be.
> > > In
> > > > > >> this
> > > > > >> > > case,
> > > > > >> > > > > we
> > > > > >> > > > > > > can potentially still treat
the consumer's offset as
> > out
> > > > of
> > > > > >> range
> > > > > >> > > and
> > > > > >> > > > > > reset
> > > > > >> > > > > > > the offset based on the offset
reset policy in the
> > > > consumer.
> > > > > >> This
> > > > > >> > > > seems
> > > > > >> > > > > > > harder to do with a global
metadata version.
> > > > > >> > > > > > >
> > > > > >> > > > > > > Jun
> > > > > >> > > > > > >
> > > > > >> > > > > > >
> > > > > >> > > > > > >
> > > > > >> > > > > > > On Mon, Dec 25, 2017 at 6:56
AM, Dong Lin <
> > > > > >> lindong28@gmail.com>
> > > > > >> > > > wrote:
> > > > > >> > > > > > >
> > > > > >> > > > > > > > Hey Jun,
> > > > > >> > > > > > > >
> > > > > >> > > > > > > > This is a very good example.
After thinking
> through
> > > this
> > > > > in
> > > > > >> > > > detail, I
> > > > > >> > > > > > > agree
> > > > > >> > > > > > > > that we need to commit
offset with leader epoch in
> > > order
> > > > > to
> > > > > >> > > address
> > > > > >> > > > > > this
> > > > > >> > > > > > > > example.
> > > > > >> > > > > > > >
> > > > > >> > > > > > > > I think the remaining
question is how to address
> the
> > > > > >> scenario
> > > > > >> > > that
> > > > > >> > > > > the
> > > > > >> > > > > > > > topic is deleted and
re-created. One possible
> > solution
> > > > is
> > > > > to
> > > > > >> > > commit
> > > > > >> > > > > > > offset
> > > > > >> > > > > > > > with both the leader
epoch and the metadata
> version.
> > > The
> > > > > >> logic
> > > > > >> > > and
> > > > > >> > > > > the
> > > > > >> > > > > > > > implementation of this
solution does not require a
> > new
> > > > > >> concept
> > > > > >> > > > (e.g.
> > > > > >> > > > > > > > partition epoch) and
it does not require any
> change
> > to
> > > > the
> > > > > >> > > message
> > > > > >> > > > > > format
> > > > > >> > > > > > > > or leader epoch. It also
allows us to order the
> > > metadata
> > > > > in
> > > > > >> a
> > > > > >> > > > > > > > straightforward manner
which may be useful in the
> > > > future.
> > > > > >> So it
> > > > > >> > > may
> > > > > >> > > > > be
> > > > > >> > > > > > a
> > > > > >> > > > > > > > better solution than
generating a random partition
> > > epoch
> > > > > >> every
> > > > > >> > > time
> > > > > >> > > > > we
> > > > > >> > > > > > > > create a partition. Does
this sound reasonable?
> > > > > >> > > > > > > >
> > > > > >> > > > > > > > Previously one concern
with using the metadata
> > version
> > > > is
> > > > > >> that
> > > > > >> > > > > consumer
> > > > > >> > > > > > > > will be forced to refresh
metadata even if
> metadata
> > > > > version
> > > > > >> is
> > > > > >> > > > > > increased
> > > > > >> > > > > > > > due to topics that the
consumer is not interested
> > in.
> > > > Now
> > > > > I
> > > > > >> > > > realized
> > > > > >> > > > > > that
> > > > > >> > > > > > > > this is probably not
a problem. Currently client
> > will
> > > > > >> refresh
> > > > > >> > > > > metadata
> > > > > >> > > > > > > > either due to InvalidMetadataException
in the
> > response
> > > > > from
> > > > > >> > > broker
> > > > > >> > > > or
> > > > > >> > > > > > due
> > > > > >> > > > > > > > to metadata expiry. The
addition of the metadata
> > > version
> > > > > >> should
> > > > > >> > > > > > increase
> > > > > >> > > > > > > > the overhead of metadata
refresh caused by
> > > > > >> > > > InvalidMetadataException.
> > > > > >> > > > > If
> > > > > >> > > > > > > > client refresh metadata
due to expiry and it
> > receives
> > > a
> > > > > >> > metadata
> > > > > >> > > > > whose
> > > > > >> > > > > > > > version is lower than
the current metadata
> version,
> > we
> > > > can
> > > > > >> > reject
> > > > > >> > > > the
> > > > > >> > > > > > > > metadata but still reset
the metadata age, which
> > > > > essentially
> > > > > >> > keep
> > > > > >> > > > the
> > > > > >> > > > > > > > existing behavior in
the client.
> > > > > >> > > > > > > >
> > > > > >> > > > > > > > Thanks much,
> > > > > >> > > > > > > > Dong
> > > > > >> > > > > > > >
> > > > > >> > > > > > >
> > > > > >> > > > > >
> > > > > >> > > > >
> > > > > >> > > >
> > > > > >> > >
> > > > > >> >
> > > > > >>
> > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message