kafka-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Dong Lin <lindon...@gmail.com>
Subject Re: [DISCUSS] KIP-232: Detect outdated metadata by adding ControllerMetadataEpoch field
Date Mon, 18 Dec 2017 21:58:10 GMT
Hey Jun,

Thanks much for your comments. These are very thoughtful ideas. Please see
my comments below.

On Thu, Dec 14, 2017 at 6:38 PM, Jun Rao <jun@confluent.io> wrote:

> Hi, Dong,
>
> Thanks for the update. A few more comments below.
>
> 10. It seems that we need to return the leader epoch in the fetch response
> as well When fetching data, we could be fetching data from a leader epoch
> older than what's returned in the metadata response. So, we want to use the
> leader epoch associated with the offset being fetched for committing
> offsets.
>

It seems that we may have two separate issues here. The first issue is that
consumer uses metadata that is older than the one it uses before. The
second issue is that consumer uses metadata which is newer than the
corresponding leader epoch in the leader broker. We know that the
OffsetOutOfRangeException described in this KIP can be prevented by
avoiding the first issue. On the other hand, it seems that the
OffsetOffsetOutOfRangeException can still happen even if we avoid the
second issue -- if consumer uses an older version of metadata, the leader
epoch in its metadata may equal the leader epoch in the broker even if the
leader epoch in the broker is oudated.

Given this understanding, I am not sure why we need to return the leader
epoch in the fetch response. As long as consumer's metadata is not going
back in version, I think we are good. Did I miss something here?


>
> 11. Should we now extend OffsetAndMetadata used in the offset commit api in
> KafkaConsumer to include leader epoch? Similarly, should we return leader
> epoch in endOffsets(), beginningOffsets() and position()? We probably need
> to think about how to make the api backward compatible.
>

After thinking through this carefully, I think we probably don't want to
extend OffsetAndMetadata to include leader epoch because leader epoch is
kind of implementation detail which ideally should be hidden from user. The
consumer can include leader epoch in the OffsetCommitRequest after taking
offset from commitSync(final Map<TopicPartition, OffsetAndMetadata>
offsets). Similarly consumer can store leader epoch from
OffsetFetchResponse and only provide offset to user via
consumer.committed(topicPartition). This solution seems to work well and we
don't have to make changes to consumer's public API. Does this sound OK?


>
> 12. It seems that we now need to store leader epoch in the offset topic.
> Could you include the new schema for the value of the offset topic and add
> upgrade notes?


You are right. I have updated the KIP to specify the new schema for the
value of the offset topic. Can you take another look?

For existing messages in the offset topic, leader_epoch will be missing. We
will use leader_epoch = -1 to indicate the missing leader_epoch. Then the
consumer behavior will be the same as it is now because any leader_epoch in
the MetadataResponse will be larger than the leader_epoch = -1 in the
OffetFetchResponse. Thus we don't need specific procedure for upgrades due
to this change in the offset topic schema. By "upgrade nodes", do you mean
the sentences we need to include in the upgrade.html in the PR later?


>
> Jun
>
>
> On Tue, Dec 12, 2017 at 5:19 PM, Dong Lin <lindong28@gmail.com> wrote:
>
> > Hey Jun,
> >
> > I see. Sounds good. Yeah it is probably simpler to leave this to another
> > KIP in the future.
> >
> > Thanks for all the comments. Since there is no further comment in the
> > community, I will open the voting thread.
> >
> > Thanks,
> > Dong
> >
> > On Mon, Dec 11, 2017 at 5:37 PM, Jun Rao <jun@confluent.io> wrote:
> >
> > > Hi, Dong,
> > >
> > > The case that I am thinking is network partitioning. Suppose one
> deploys
> > a
> > > stretched cluster across multiple AZs in the same region. If the
> machines
> > > in one AZ can't communicate to brokers in other AZs due to a network
> > issue,
> > > the brokers in that AZ won't get any new metadata.
> > >
> > > We can potentially solve this problem by requiring some kind of regular
> > > heartbeats between the controller and the broker. This may need some
> more
> > > thoughts. So, it's probably fine to leave this to another KIP in the
> > > future.
> > >
> > > Thanks,
> > >
> > > Jun
> > >
> > > On Mon, Dec 11, 2017 at 2:55 PM, Dong Lin <lindong28@gmail.com> wrote:
> > >
> > > > Hey Jun,
> > > >
> > > > Thanks for the comment. I am open to improve this KIP to address more
> > > > problems. I probably need more help in understanding what is the
> > current
> > > > problem with consumer using outdated metadata and whether it is
> easier
> > to
> > > > address it together with this KIP.
> > > >
> > > > I agree that a consumer can potentially talk to old leader for a long
> > > time
> > > > even after this KIP. But after this KIP, the consumer probably should
> > not
> > > > get OffetOutofRangeException and therefore will not cause offset
> rewind
> > > > issue. So the only problem is that consumer will not be able to fetch
> > > data
> > > > until it has updated metadata. It seems that this situation can only
> > > happen
> > > > if the broker is too slow in processing LeaderAndIsrRequest since
> > > otherwise
> > > > the consumer will be forced to update metadata due to
> > > > NotLeaderForPartitionException. So the problem we are having here is
> > > that
> > > > consumer will not be able to fetch data if some broker is too slow in
> > > > processing LeaderAndIsrRequest.
> > > >
> > > > Because Kafka propagates LeaderAndIsrRequest asynchronously to all
> > > brokers
> > > > in the cluster, there will always be a period of time when consumer
> can
> > > not
> > > > fetch data for the partition during the leadership change. Thus it
> > seems
> > > > more like a broker-side performance issue instead of client-side
> > > > correctness issue. My gut feel is that it is not causing a much a
> > problem
> > > > as the problem to be fixed in this KIP. And if we were to address it,
> > we
> > > > probably need to make change in the broker side, e.g. with
> prioritized
> > > > queue for controller-related requests, which may be kind of
> orthogonal
> > to
> > > > this KIP. I am not very sure it will be easier to address it with the
> > > > change in this KIP. Do you have any recommendation?
> > > >
> > > > Thanks,
> > > > Dong
> > > >
> > > >
> > > > On Mon, Dec 11, 2017 at 1:51 PM, Jun Rao <jun@confluent.io> wrote:
> > > >
> > > > > Hi, Dong,
> > > > >
> > > > > Thanks for the reply.
> > > > >
> > > > > My suggestion of forcing the metadata refresh from the controller
> may
> > > not
> > > > > work in general since the cached controller could be outdated too.
> > The
> > > > > general problem is that if a consumer's metadata is outdated, it
> may
> > > get
> > > > > stuck with the old leader for a long time. We can address the issue
> > of
> > > > > detecting outdated metadata in a separate KIP in the future if you
> > > didn't
> > > > > intend to address it in this KIP.
> > > > >
> > > > > Thanks,
> > > > >
> > > > > Jun
> > > > >
> > > > >
> > > > > On Sat, Dec 9, 2017 at 10:12 PM, Dong Lin <lindong28@gmail.com>
> > wrote:
> > > > >
> > > > > > Hey Jun,
> > > > > >
> > > > > > Thanks much for your comments. Given that client needs to
> > > de-serialize
> > > > > the
> > > > > > metadata anyway, the extra overhead of checking the per-partition
> > > > version
> > > > > > for every partition should not be a big concern. Thus it makes
> > sense
> > > to
> > > > > use
> > > > > > leader epoch as the per-partition version instead of creating
a
> > > global
> > > > > > metadata version. I will update the KIP to do that.
> > > > > >
> > > > > > Regarding the detection of outdated metadata, I think it is
> > possible
> > > to
> > > > > > ensure that client gets latest metadata by fetching from
> > controller.
> > > > Note
> > > > > > that this requires extra logic in the controller such that
> > controller
> > > > > > updates metadata directly in memory without requiring
> > > > > > UpdateMetadataRequest. But I am not sure the main motivation
of
> > this
> > > at
> > > > > > this moment. But this makes controller more like a bottleneck
in
> > the
> > > > > > cluster which we probably want to avoid.
> > > > > >
> > > > > > I think we can probably keep the current way of ensuring metadata
> > > > > > freshness. Currently client will be forced to refresh metadata
if
> > > > broker
> > > > > > returns error (e.g. NotLeaderForPartition) due to outdated
> metadata
> > > or
> > > > if
> > > > > > the metadata does not contain the partition that the client
> needs.
> > In
> > > > the
> > > > > > future, as you previously suggested, we can include per-partition
> > > > > > leaderEpoch in the FetchRequest/ProduceRequest such that broker
> can
> > > > > return
> > > > > > error if the epoch is smaller than cached epoch in the broker.
> > Given
> > > > that
> > > > > > this adds more complexity to Kafka, I think we can probably
think
> > > about
> > > > > > that leader when we have a specific use-case or problem to solve
> > with
> > > > > > up-to-date metadata. Does this sound OK?
> > > > > >
> > > > > > Thanks,
> > > > > > Dong
> > > > > >
> > > > > >
> > > > > >
> > > > > > On Fri, Dec 8, 2017 at 3:53 PM, Jun Rao <jun@confluent.io>
> wrote:
> > > > > >
> > > > > > > Hi, Dong,
> > > > > > >
> > > > > > > Thanks for the reply. A few more points below.
> > > > > > >
> > > > > > > For dealing with how to prevent a consumer switching from
a new
> > > > leader
> > > > > to
> > > > > > > an old leader, you suggestion that refreshes metadata on
> consumer
> > > > > restart
> > > > > > > until it sees a metadata version >= the one associated
with the
> > > > offset
> > > > > > > works too, as long as we guarantee that the cached metadata
> > > versions
> > > > on
> > > > > > the
> > > > > > > brokers only go up.
> > > > > > >
> > > > > > > The second discussion point is on whether the metadata
> versioning
> > > > > should
> > > > > > be
> > > > > > > per partition or global. For the partition level versioning,
> you
> > > were
> > > > > > > concerned about the performance. Given that metadata updates
> are
> > > > rare,
> > > > > I
> > > > > > am
> > > > > > > not sure if it's a big concern though. Doing a million
if tests
> > is
> > > > > > probably
> > > > > > > going to take less than 1ms. Another thing is that the
metadata
> > > > version
> > > > > > > seems to need to survive controller failover. In your current
> > > > > approach, a
> > > > > > > consumer may not be able to wait on the right version of
the
> > > metadata
> > > > > > after
> > > > > > > the consumer restart since the metadata version may have
been
> > > > recycled
> > > > > on
> > > > > > > the server side due to a controller failover while the
consumer
> > is
> > > > > down.
> > > > > > > The partition level leaderEpoch survives controller failure
and
> > > won't
> > > > > > have
> > > > > > > this issue.
> > > > > > >
> > > > > > > Lastly, neither your proposal nor mine addresses the issue
how
> to
> > > > > > guarantee
> > > > > > > a consumer to detect that is metadata is outdated. Currently,
> the
> > > > > > consumer
> > > > > > > is not guaranteed to fetch metadata from every broker within
> some
> > > > > bounded
> > > > > > > period of time. Maybe this is out of the scope of your
KIP. But
> > one
> > > > > idea
> > > > > > is
> > > > > > > force the consumer to refresh metadata from the controller
> > > > > periodically.
> > > > > > >
> > > > > > > Jun
> > > > > > >
> > > > > > >
> > > > > > > On Thu, Dec 7, 2017 at 11:25 AM, Dong Lin <lindong28@gmail.com
> >
> > > > wrote:
> > > > > > >
> > > > > > > > Hey Jun,
> > > > > > > >
> > > > > > > > Thanks much for the comments. Great point particularly
> > regarding
> > > > > (3). I
> > > > > > > > haven't thought about this before.
> > > > > > > >
> > > > > > > > It seems that there are two possible ways where the
version
> > > number
> > > > > can
> > > > > > be
> > > > > > > > used. One solution is for client to check the version
number
> at
> > > the
> > > > > > time
> > > > > > > it
> > > > > > > > receives MetadataResponse. And if the version number
in the
> > > > > > > > MetadataResponse is smaller than the version number
in the
> > > client's
> > > > > > > cache,
> > > > > > > > the client will be forced to fetch metadata again.
 Another
> > > > solution,
> > > > > > as
> > > > > > > > you have suggested, is for broker to check the version
number
> > at
> > > > the
> > > > > > time
> > > > > > > > it receives a request from client. The broker will
reject the
> > > > request
> > > > > > if
> > > > > > > > the version is smaller than the version in broker's
cache.
> > > > > > > >
> > > > > > > > I am not very sure that the second solution can address
the
> > > problem
> > > > > > here.
> > > > > > > > In the scenario described in the JIRA ticket, broker's
cache
> > may
> > > be
> > > > > > > > outdated because it has not processed the LeaderAndIsrRequest
> > > from
> > > > > the
> > > > > > > > controller. Thus it may still process client's request
even
> if
> > > the
> > > > > > > version
> > > > > > > > in client's request is actually outdated. Does this
make
> sense?
> > > > > > > >
> > > > > > > > IMO, it seems that we can address problem (3) by saving
the
> > > > metadata
> > > > > > > > version together with the offset. After consumer starts,
it
> > will
> > > > keep
> > > > > > > > fetching metadata until the metadata version >=
the version
> > saved
> > > > > with
> > > > > > > the
> > > > > > > > offset of this partition.
> > > > > > > >
> > > > > > > > Regarding problems (1) and (2): Currently we use the
version
> > > number
> > > > > in
> > > > > > > the
> > > > > > > > MetadataResponse to ensure that the metadata does
not go back
> > in
> > > > > time.
> > > > > > > > There are two alternative solutions to address problems
(1)
> and
> > > > (2).
> > > > > > One
> > > > > > > > solution is for client to enumerate all partitions
in the
> > > > > > > MetadataResponse,
> > > > > > > > compare their epoch with those in the cached metadata,
and
> > > rejects
> > > > > the
> > > > > > > > MetadataResponse iff any leader epoch is smaller.
The main
> > > concern
> > > > is
> > > > > > > that
> > > > > > > > MetadataResponse currently cached information of all
> partitions
> > > in
> > > > > the
> > > > > > > > entire cluster. It may slow down client's performance
if we
> > were
> > > to
> > > > > do
> > > > > > > it.
> > > > > > > > The other solution is for client to enumerate partitions
for
> > only
> > > > > > topics
> > > > > > > > registered in the org.apache.kafka.clients.Metadata,
which
> > will
> > > be
> > > > > an
> > > > > > > > empty
> > > > > > > > set for producer and the set of subscribed partitions
for
> > > consumer.
> > > > > But
> > > > > > > > this degrades to all topics if consumer subscribes
to topics
> in
> > > the
> > > > > > > cluster
> > > > > > > > by pattern.
> > > > > > > >
> > > > > > > > Note that client will only be forced to update metadata
if
> the
> > > > > version
> > > > > > in
> > > > > > > > the MetadataResponse is smaller than the version in
the
> cached
> > > > > > metadata.
> > > > > > > In
> > > > > > > > general it should not be a problem. It can be a problem
only
> if
> > > > some
> > > > > > > broker
> > > > > > > > is particularly slower than other brokers in processing
> > > > > > > > UpdateMetadataRequest. When this is the case, it means
that
> the
> > > > > broker
> > > > > > is
> > > > > > > > also particularly slower in processing LeaderAndIsrRequest,
> > which
> > > > can
> > > > > > > cause
> > > > > > > > problem anyway because some partition will probably
have no
> > > leader
> > > > > > during
> > > > > > > > this period. I am not sure problems (1) and (2) cause
more
> > > problem
> > > > > than
> > > > > > > > what we already have.
> > > > > > > >
> > > > > > > > Thanks,
> > > > > > > > Dong
> > > > > > > >
> > > > > > > >
> > > > > > > > On Wed, Dec 6, 2017 at 6:42 PM, Jun Rao <jun@confluent.io>
> > > wrote:
> > > > > > > >
> > > > > > > > > Hi, Dong,
> > > > > > > > >
> > > > > > > > > Great finding on the issue. It's a real problem.
A few
> > comments
> > > > > about
> > > > > > > the
> > > > > > > > > KIP. (1) I am not sure about updating
> > controller_metadata_epoch
> > > > on
> > > > > > > every
> > > > > > > > > UpdateMetadataRequest. Currently, the controller
can send
> > > > > > > > > UpdateMetadataRequest when there is no actual
metadata
> > change.
> > > > > Doing
> > > > > > > this
> > > > > > > > > may require unnecessary metadata refresh on the
client. (2)
> > > > > > > > > controller_metadata_epoch is global across all
topics. This
> > > means
> > > > > > that
> > > > > > > a
> > > > > > > > > client may be forced to update its metadata even
when the
> > > > metadata
> > > > > > for
> > > > > > > > the
> > > > > > > > > topics that it cares haven't changed. (3) It
doesn't seem
> > that
> > > > the
> > > > > > KIP
> > > > > > > > > handles the corner case when a consumer is restarted.
Say a
> > > > > consumer
> > > > > > > > reads
> > > > > > > > > from the new leader, commits the offset and then
is
> > restarted.
> > > On
> > > > > > > > restart,
> > > > > > > > > the consumer gets an outdated metadata and fetches
from the
> > old
> > > > > > leader.
> > > > > > > > > Then, the consumer will get into the offset out
of range
> > issue.
> > > > > > > > >
> > > > > > > > > Given the above, I am thinking of the following
approach.
> We
> > > > > actually
> > > > > > > > > already have metadata versioning at the partition
level.
> Each
> > > > > leader
> > > > > > > has
> > > > > > > > a
> > > > > > > > > leader epoch which is monotonically increasing.
We can
> > > > potentially
> > > > > > > > > propagate leader epoch back in the metadata response
and
> the
> > > > > clients
> > > > > > > can
> > > > > > > > > cache that. This solves the issue of (1) and
(2). To solve
> > (3),
> > > > > when
> > > > > > > > saving
> > > > > > > > > an offset, we could save both an offset and the
> corresponding
> > > > > leader
> > > > > > > > epoch.
> > > > > > > > > When fetching the data, the consumer provides
both the
> offset
> > > and
> > > > > the
> > > > > > > > > leader epoch. A leader will only serve the request
if its
> > > leader
> > > > > > epoch
> > > > > > > is
> > > > > > > > > equal to or greater than the leader epoch from
the
> consumer.
> > To
> > > > > > achieve
> > > > > > > > > this, we need to change the fetch request protocol
and the
> > > offset
> > > > > > > commit
> > > > > > > > > api, which requires some more thoughts.
> > > > > > > > >
> > > > > > > > > Thanks,
> > > > > > > > >
> > > > > > > > > Jun
> > > > > > > > >
> > > > > > > > >
> > > > > > > > > On Wed, Dec 6, 2017 at 10:57 AM, Dong Lin <
> > lindong28@gmail.com
> > > >
> > > > > > wrote:
> > > > > > > > >
> > > > > > > > > > Bump up the thread.
> > > > > > > > > >
> > > > > > > > > > It will be great to have more comments on
whether we
> should
> > > do
> > > > it
> > > > > > or
> > > > > > > > > > whether there is better way to address the
motivation of
> > this
> > > > > KIP.
> > > > > > > > > >
> > > > > > > > > > On Mon, Dec 4, 2017 at 3:09 PM, Dong Lin
<
> > > lindong28@gmail.com>
> > > > > > > wrote:
> > > > > > > > > >
> > > > > > > > > > > I don't have an interesting rejected
alternative
> solution
> > > to
> > > > > put
> > > > > > in
> > > > > > > > the
> > > > > > > > > > > KIP. If there is good alternative solution
from anyone
> in
> > > > this
> > > > > > > > thread,
> > > > > > > > > I
> > > > > > > > > > am
> > > > > > > > > > > happy to discuss this and update the
KIP accordingly.
> > > > > > > > > > >
> > > > > > > > > > > Thanks,
> > > > > > > > > > > Dong
> > > > > > > > > > >
> > > > > > > > > > > On Mon, Dec 4, 2017 at 1:12 PM, Ted
Yu <
> > > yuzhihong@gmail.com>
> > > > > > > wrote:
> > > > > > > > > > >
> > > > > > > > > > >> It is clearer now.
> > > > > > > > > > >>
> > > > > > > > > > >> I noticed that Rejected Alternatives
section is empty.
> > > > > > > > > > >> Have you considered any alternative
?
> > > > > > > > > > >>
> > > > > > > > > > >> Cheers
> > > > > > > > > > >>
> > > > > > > > > > >> On Mon, Dec 4, 2017 at 1:07 PM,
Dong Lin <
> > > > lindong28@gmail.com
> > > > > >
> > > > > > > > wrote:
> > > > > > > > > > >>
> > > > > > > > > > >> > Ted, thanks for catching this.
I have updated the
> > > sentence
> > > > > to
> > > > > > > make
> > > > > > > > > it
> > > > > > > > > > >> > readable.
> > > > > > > > > > >> >
> > > > > > > > > > >> > Thanks,
> > > > > > > > > > >> > Dong
> > > > > > > > > > >> >
> > > > > > > > > > >> > On Sat, Dec 2, 2017 at 3:05
PM, Ted Yu <
> > > > yuzhihong@gmail.com
> > > > > >
> > > > > > > > wrote:
> > > > > > > > > > >> >
> > > > > > > > > > >> > > bq. It the controller_epoch
of the incoming
> > > > > > MetadataResponse,
> > > > > > > or
> > > > > > > > > if
> > > > > > > > > > >> the
> > > > > > > > > > >> > > controller_epoch is the
same but the
> > > > > > controller_metadata_epoch
> > > > > > > > > > >> > >
> > > > > > > > > > >> > > Can you update the above
sentence so that the
> > > intention
> > > > is
> > > > > > > > > clearer ?
> > > > > > > > > > >> > >
> > > > > > > > > > >> > > Thanks
> > > > > > > > > > >> > >
> > > > > > > > > > >> > > On Fri, Dec 1, 2017 at
6:33 PM, Dong Lin <
> > > > > > lindong28@gmail.com
> > > > > > > >
> > > > > > > > > > wrote:
> > > > > > > > > > >> > >
> > > > > > > > > > >> > > > Hi all,
> > > > > > > > > > >> > > >
> > > > > > > > > > >> > > > I have created KIP-232:
Detect outdated metadata
> > by
> > > > > adding
> > > > > > > > > > >> > > > ControllerMetadataEpoch
field:
> > > > > > > > > > >> > > > https://cwiki.apache.org/
> > > > confluence/display/KAFKA/KIP-
> > > > > > > > > > >> > > > 232%3A+Detect+outdated+metadata+by+adding+
> > > > > > > > > > >> > ControllerMetadataEpoch+field
> > > > > > > > > > >> > > > .
> > > > > > > > > > >> > > >
> > > > > > > > > > >> > > > The KIP proposes
to add fields in
> MetadataResponse
> > > and
> > > > > > > > > > >> > > > UpdateMetadataRequest
so that client can reject
> > > > outdated
> > > > > > > > > metadata
> > > > > > > > > > >> and
> > > > > > > > > > >> > > avoid
> > > > > > > > > > >> > > > unnecessary OffsetOutOfRangeException.
Otherwise
> > > there
> > > > > is
> > > > > > > > > > currently
> > > > > > > > > > >> > race
> > > > > > > > > > >> > > > condition that can
cause consumer to reset
> offset
> > > > which
> > > > > > > > > negatively
> > > > > > > > > > >> > affect
> > > > > > > > > > >> > > > the consumer's availability.
> > > > > > > > > > >> > > >
> > > > > > > > > > >> > > > Feedback and suggestions
are welcome!
> > > > > > > > > > >> > > >
> > > > > > > > > > >> > > > Regards,
> > > > > > > > > > >> > > > Dong
> > > > > > > > > > >> > > >
> > > > > > > > > > >> > >
> > > > > > > > > > >> >
> > > > > > > > > > >>
> > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message