kafka-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Dong Lin <lindon...@gmail.com>
Subject Re: [DISCUSS] KIP-232: Detect outdated metadata by adding ControllerMetadataEpoch field
Date Wed, 17 Jan 2018 18:10:42 GMT
Thinking about point 61 more, I realize that the async zookeeper read may
make it less of an issue for controller to read more zookeeper nodes.
Writing partition_epoch in the per-partition znode makes it simpler to
handle the broker failure between zookeeper writes for a topic creation. I
have updated the KIP to use the suggested approach.


On Wed, Jan 17, 2018 at 9:57 AM, Dong Lin <lindong28@gmail.com> wrote:

> Hey Jun,
>
> Thanks much for the comments. Please see my comments inline.
>
> On Tue, Jan 16, 2018 at 4:38 PM, Jun Rao <jun@confluent.io> wrote:
>
>> Hi, Dong,
>>
>> Thanks for the updated KIP. Looks good to me overall. Just a few minor
>> comments.
>>
>> 60. OffsetAndMetadata positionAndOffsetEpoch(TopicPartition partition):
>> It
>> seems that there is no need to return metadata. We probably want to return
>> sth like OffsetAndEpoch.
>>
>
> Previously I think we may want to re-use the existing class to keep our
> consumer interface simpler. I have updated the KIP to add class
> OffsetAndOffsetEpoch. I didn't use OffsetAndEpoch because user may confuse
> this name with OffsetEpoch. Does this sound OK?
>
>
>>
>> 61. Should we store partition_epoch in
>> /brokers/topics/[topic]/partitions/[partitionId] in ZK?
>>
>
> I have considered this. I think the advantage of adding the
> partition->partition_epoch map in the existing
> znode /brokers/topics/[topic]/partitions is that controller only needs to
> read one znode per topic to gets its partition_epoch information. Otherwise
> controller may need to read one extra znode per partition to get the same
> information.
>
> When we delete partition or expand partition of a topic, someone needs to
> modify partition->partition_epoch map in znode
> /brokers/topics/[topic]/partitions. This may seem a bit more complicated
> than simply adding or deleting znode /brokers/topics/[topic]/partitions/[partitionId].
> But the complexity is probably similar to the existing operation of
> modifying the partition->replica_list mapping in znode
> /brokers/topics/[topic]. So not sure it is better to store the
> partition_epoch in /brokers/topics/[topic]/partitions/[partitionId]. What
> do you think?
>
>
>>
>> 62. For checking outdated metadata in the client, we probably want to add
>> when max_partition_epoch will be used.
>>
>
> The max_partition_epoch is used in the Proposed Changes -> Client's
> metadata refresh section to determine whether a metadata is outdated. And
> this formula is referenced and re-used in other sections to determine
> whether a metadata is outdated. Does this formula look OK?
>
>
>>
>> 63. "The leader_epoch should be the largest leader_epoch of messages whose
>> offset < the commit offset. If no message has been consumed since consumer
>> initialization, the leader_epoch from seek(...) or OffsetFetchResponse
>> should be used. The partition_epoch should be read from the last
>> FetchResponse corresponding to the given partition and commit offset. ":
>> leader_epoch and partition_epoch are associated with an offset. So, if no
>> message is consumed, there is no offset and therefore there is no need to
>> read leader_epoch and partition_epoch. Also, the leader_epoch associated
>> with the offset should just come from the messages returned in the fetch
>> response.
>>
>
> I am thinking that, if user calls seek(..) and commitSync(...) without
> consuming any messages, we should re-use the leader_epoch and
> partition_epoch provided by the seek(...) in the OffsetCommitRequest. And
> if messages have been successfully consumed, then leader_epoch will come
> from the messages returned in the fetch response. The condition "messages
> whose offset < the commit offset" is needed to take care of the log
> compacted topic which may have offset gap due to log cleaning.
>
> Did I miss something here? Or should I rephrase the paragraph to make it
> less confusing?
>
>
>> 64. Could you include the public methods in the OffsetEpoch class?
>>
>
> I mistakenly deleted the definition of OffsetEpoch class from the KIP. I
> just added it back with the public methods. Could you take another look?
>
>
>>
>> Jun
>>
>>
>> On Thu, Jan 11, 2018 at 5:43 PM, Dong Lin <lindong28@gmail.com> wrote:
>>
>> > Hey Jun,
>> >
>> > Thanks much. I agree that we can not rely on committed offsets to be
>> always
>> > deleted when we delete topic. So it is necessary to use a per-partition
>> > epoch that does not change unless this partition is deleted. I also
>> agree
>> > that it is very nice to be able to uniquely identify a message with
>> > (offset, leader_epoch, partition_epoch) in face of potential topic
>> deletion
>> > and unclean leader election.
>> >
>> > I agree with all your comments. And I have updated the KIP based on our
>> > latest discussion. In addition, I added InvalidPartitionEpochException
>> > which will be thrown by consumer.poll() if the partition_epoch
>> associated
>> > with the partition, which can be given to consumer using seek(...), is
>> > different from the partition_epoch in the FetchResponse.
>> >
>> > Can you take another look at the latest KIP?
>> >
>> > Thanks!
>> > Dong
>> >
>> >
>> >
>> > On Wed, Jan 10, 2018 at 2:24 PM, Jun Rao <jun@confluent.io> wrote:
>> >
>> > > Hi, Dong,
>> > >
>> > > My replies are the following.
>> > >
>> > > 60. What you described could also work. The drawback is that we will
>> be
>> > > unnecessarily changing the partition epoch when a partition hasn't
>> really
>> > > changed. I was imagining that the partition epoch will be stored in
>> > > /brokers/topics/[topic]/partitions/[partitionId], instead of at the
>> > topic
>> > > level. So, not sure if ZK size limit is an issue.
>> > >
>> > > 61, 62 and 65. To me, the offset + offset_epoch is a unique identifier
>> > for
>> > > a message. So, if a message hasn't changed, the offset and the
>> associated
>> > > offset_epoch ideally should remain the same (it will be kind of weird
>> if
>> > > two consumer apps save the offset on the same message, but the
>> > offset_epoch
>> > > are different). partition_epoch + leader_epoch give us that.
>> > global_epoch +
>> > > leader_epoch don't. If we use this approach, we can solve not only the
>> > > problem that you have identified, but also other problems when there
>> is
>> > > data loss or topic re-creation more reliably. For example, in the
>> future,
>> > > if we include the partition_epoch and leader_epoch in the fetch
>> request,
>> > > the server can do a more reliable check of whether that offset is
>> valid
>> > or
>> > > not. I am not sure that we can rely upon all external offsets to be
>> > removed
>> > > on topic deletion. For example, a topic may be deleted by an admin who
>> > may
>> > > not know all the applications.
>> > >
>> > > If we agree on the above, the second question is then how to reliably
>> > > propagate the partition_epoch and the leader_epoch to the consumer
>> when
>> > > there are leader or partition changes. The leader_epoch comes from the
>> > > message, which is reliable. So, I was suggesting that when we store an
>> > > offset, we can just store the leader_epoch from the message set
>> > containing
>> > > that offset. Similarly, I was thinking that if the partition_epoch is
>> in
>> > > the fetch response, we can propagate partition_epoch reliably where is
>> > > partition_epoch change.
>> > >
>> > > 63. My point is that once a leader is producing a message in the new
>> > > partition_epoch, ideally, we should associate the new offsets with the
>> > new
>> > > partition_epoch. Otherwise, the offset_epoch won't be the correct
>> unique
>> > > identifier (useful for solving other problems mentioned above). I was
>> > > originally thinking that the leader will include the partition_epoch
>> in
>> > the
>> > > metadata cache in the fetch response. It's just that right now,
>> metadata
>> > > cache is updated on UpdateMetadataRequest, which typically happens
>> after
>> > > the LeaderAndIsrRequest. Another approach is for the leader to cache
>> the
>> > > partition_epoch in the Partition object and return that (instead of
>> the
>> > one
>> > > in metadata cache) in the fetch response.
>> > >
>> > > 65. It seems to me that the global_epoch and the partition_epoch have
>> > > different purposes. A partition_epoch has the benefit that it (1) can
>> be
>> > > used to form a unique identifier for a message and (2) can be used to
>> > > solve other
>> > > corner case problems in the future. I am not sure having just a
>> > > global_epoch can achieve these. global_epoch is useful to determine
>> which
>> > > version of the metadata is newer, especially with topic deletion.
>> > >
>> > > Thanks,
>> > >
>> > > Jun
>> > >
>> > > On Tue, Jan 9, 2018 at 11:34 PM, Dong Lin <lindong28@gmail.com>
>> wrote:
>> > >
>> > > > Regarding the use of the global epoch in 65), it is very similar to
>> the
>> > > > proposal of the metadata_epoch we discussed earlier. The main
>> > difference
>> > > is
>> > > > that this epoch is incremented when we create/expand/delete topic
>> and
>> > > does
>> > > > not change when controller re-send metadata.
>> > > >
>> > > > I looked at our previous discussion. It seems that we prefer
>> > > > partition_epoch over the metadata_epoch because 1) we prefer not to
>> > have
>> > > an
>> > > > ever growing metadata_epoch and 2) we can reset offset better when
>> > topic
>> > > is
>> > > > re-created. The use of global topic_epoch avoids the drawback of an
>> > ever
>> > > > quickly ever growing metadata_epoch. Though the global epoch does
>> not
>> > > allow
>> > > > us to recognize the invalid offset committed before the topic
>> > > re-creation,
>> > > > we can probably just delete the offset when we delete a topic. Thus
>> I
>> > am
>> > > > not very sure whether it is still worthwhile to have a per-partition
>> > > > partition_epoch if the metadata already has the global epoch.
>> > > >
>> > > >
>> > > > On Tue, Jan 9, 2018 at 6:58 PM, Dong Lin <lindong28@gmail.com>
>> wrote:
>> > > >
>> > > > > Hey Jun,
>> > > > >
>> > > > > Thanks so much. These comments very useful. Please see below my
>> > > comments.
>> > > > >
>> > > > > On Mon, Jan 8, 2018 at 5:52 PM, Jun Rao <jun@confluent.io> wrote:
>> > > > >
>> > > > >> Hi, Dong,
>> > > > >>
>> > > > >> Thanks for the updated KIP. A few more comments.
>> > > > >>
>> > > > >> 60. Perhaps having a partition epoch is more flexible since in
>> the
>> > > > future,
>> > > > >> we may support deleting a partition as well.
>> > > > >>
>> > > > >
>> > > > > Yeah I have considered this. I think we can probably still support
>> > > > > deleting a partition by using the topic_epoch -- when partition
>> of a
>> > > > topic
>> > > > > is deleted or created, epoch of all partitions of this topic will
>> be
>> > > > > incremented by 1. Therefore, if that partition is re-created
>> later,
>> > the
>> > > > > epoch of that partition will still be larger than its epoch before
>> > the
>> > > > > deletion, which still allows the client to order the metadata for
>> the
>> > > > > purpose of this KIP. Does this sound reasonable?
>> > > > >
>> > > > > The advantage of using topic_epoch instead of partition_epoch is
>> that
>> > > the
>> > > > > size of the /brokers/topics/[topic] znode and request/response
>> size
>> > can
>> > > > be
>> > > > > smaller. We have a limit on the maximum size of znode (typically
>> > 1MB).
>> > > > Use
>> > > > > partition epoch can effectively reduce the number of partitions
>> that
>> > > can
>> > > > be
>> > > > > described by the /brokers/topics/[topic] znode.
>> > > > >
>> > > > > One use-case of partition_epoch for client to detect that the
>> > committed
>> > > > > offset, either from kafka offset topic or from the external store
>> is
>> > > > > invalid after partition deletion and re-creation. However, it
>> seems
>> > > that
>> > > > we
>> > > > > can also address this use-case with other approaches. For example,
>> > when
>> > > > > AdminClient deletes partitions, it can also delete the committed
>> > > offsets
>> > > > > for those partitions from the offset topic. If user stores offset
>> > > > > externally, it might make sense for user to similarly remove
>> offsets
>> > of
>> > > > > related partitions after these partitions are deleted. So I am not
>> > sure
>> > > > > that we should use partition_epoch in this KIP.
>> > > > >
>> > > > >
>> > > > >>
>> > > > >> 61. It seems that the leader epoch returned in the position()
>> call
>> > > > should
>> > > > >> the the leader epoch returned in the fetch response, not the one
>> in
>> > > the
>> > > > >> metadata cache of the client.
>> > > > >
>> > > > >
>> > > > > I think this is a good idea. Just to double check, this change
>> does
>> > not
>> > > > > affect the correctness or performance of this KIP. But it can be
>> > useful
>> > > > if
>> > > > > we want to use the leader_epoch to better handle the offset rest
>> in
>> > > case
>> > > > of
>> > > > > unclean leader election, which is listed in the future work. Is
>> this
>> > > > > understanding correct?
>> > > > >
>> > > > > I have updated the KIP to specify that the leader_epoch returned
>> by
>> > > > > position() should be the largest leader_epoch of those already
>> > consumed
>> > > > > messages whose offset < position. If no message has been consumed
>> > since
>> > > > > consumer initialization, the leader_epoch from seek() or
>> > > > > OffsetFetchResponse should be used. The offset included in the
>> > > > > OffsetCommitRequest will also be determined in the similar manner.
>> > > > >
>> > > > >
>> > > > >>
>> > > > >> 62. I am wondering if we should return the partition epoch in the
>> > > fetch
>> > > > >> response as well. In the current proposal, if a topic is
>> recreated
>> > and
>> > > > the
>> > > > >> new leader is on the same broker as the old one, there is
>> nothing to
>> > > > force
>> > > > >> the metadata refresh in the client. So, the client may still
>> > associate
>> > > > the
>> > > > >> offset with the old partition epoch.
>> > > > >>
>> > > > >
>> > > > > Could you help me understand the problem if a client associates
>> old
>> > > > > partition_epoch (or the topic_epoch as of the current KIP) with
>> the
>> > > > offset?
>> > > > > The main purpose of the topic_epoch is to be able to drop
>> > leader_epoch
>> > > > to 0
>> > > > > after a partition is deleted and re-created. I guess you may be
>> > > thinking
>> > > > > about using the partition_epoch to detect that the committed
>> offset
>> > is
>> > > > > invalid? In that case, I am wondering if the alternative approach
>> > > > described
>> > > > > in 60) would be reasonable.
>> > > > >
>> > > > >
>> > > > >>
>> > > > >> 63. There is some subtle coordination between the
>> > LeaderAndIsrRequest
>> > > > and
>> > > > >> UpdateMetadataRequest. Currently, when a leader changes, the
>> > > controller
>> > > > >> first sends the LeaderAndIsrRequest to the assigned replicas and
>> the
>> > > > >> UpdateMetadataRequest to every broker. So, there could be a small
>> > > window
>> > > > >> when the leader already receives the new partition epoch in the
>> > > > >> LeaderAndIsrRequest, but the metadata cache in the broker hasn't
>> > been
>> > > > >> updated with the latest partition epoch. Not sure what's the best
>> > way
>> > > to
>> > > > >> address this issue. Perhaps we can update the metadata cache on
>> the
>> > > > broker
>> > > > >> with both LeaderAndIsrRequest and UpdateMetadataRequest. The
>> > challenge
>> > > > is
>> > > > >> that the two have slightly different data. For example, only the
>> > > latter
>> > > > >> has
>> > > > >> all endpoints.
>> > > > >>
>> > > > >
>> > > > > I am not sure whether this is a problem. Could you explain a bit
>> more
>> > > > what
>> > > > > specific problem this small window can cause?
>> > > > >
>> > > > > Since client can fetch metadata from any broker in the cluster,
>> and
>> > > given
>> > > > > that different brokers receive request (e.g. LeaderAndIsrRequest
>> and
>> > > > > UpdateMetadataRequest) in arbitrary order, the metadata received
>> by
>> > > > client
>> > > > > can be in arbitrary order (either newer or older) compared to the
>> > > > broker's
>> > > > > leadership state even if a given broker receives
>> LeaderAndIsrRequest
>> > > and
>> > > > > UpdateMetadataRequest simultaneously. So I am not sure it is
>> useful
>> > to
>> > > > > update broker's cache with LeaderAndIsrRequest.
>> > > > >
>> > > > >
>> > > > >> 64. The enforcement of leader epoch in Offset commit: We allow a
>> > > > consumer
>> > > > >> to set an arbitrary offset. So it's possible for offsets or
>> leader
>> > > epoch
>> > > > >> to
>> > > > >> go backwards. I am not sure if we could always enforce that the
>> > leader
>> > > > >> epoch only goes up on the broker.
>> > > > >>
>> > > > >
>> > > > > Sure. I have removed this check from the KIP.
>> > > > >
>> > > > > BTW, we can probably still ensure that the leader_epoch always
>> > increase
>> > > > if
>> > > > > the leader_epoch used with offset commit is the max(leader_epoch
>> of
>> > the
>> > > > > message with offset = the committed offset - 1, the largest known
>> > > > > leader_epoch from the metadata). But I don't have a good use-case
>> for
>> > > > this
>> > > > > alternative definition. So I choose the keep the KIP simple by
>> > > requiring
>> > > > > leader_epoch to always increase.
>> > > > >
>> > > > >
>> > > > >> 65. Good point on handling missing partition epoch due to topic
>> > > > deletion.
>> > > > >> Another potential way to address this is to additionally
>> propagate
>> > the
>> > > > >> global partition epoch to brokers and the clients. This way,
>> when a
>> > > > >> partition epoch is missing, we can use the global partition
>> epoch to
>> > > > >> reason
>> > > > >> about which metadata is more recent.
>> > > > >>
>> > > > >
>> > > > > This is a great idea. The global epoch can be used to order the
>> > > metadata
>> > > > > and help us recognize the more recent metadata if a topic (or
>> > > partition)
>> > > > is
>> > > > > deleted and re-created.
>> > > > >
>> > > > > Actually, it seems we only need to propagate the global epoch to
>> > > brokers
>> > > > > and clients without propagating this epoch on a per-topic or
>> > > > per-partition
>> > > > > basic. Doing so would simply interface changes made this KIP. Does
>> > this
>> > > > > approach sound reasonable?
>> > > > >
>> > > > >
>> > > > >> 66. A client may also get an offset by time using the
>> > offsetForTimes()
>> > > > >> api.
>> > > > >> So, we probably want to include offsetInternalMetadata in
>> > > > >> OffsetAndTimestamp
>> > > > >> as well.
>> > > > >>
>> > > > >
>> > > > > You are right. This probably also requires us to change the
>> > > > > ListOffsetRequest as well. I will update the KIP after we agree on
>> > the
>> > > > > solution for 65).
>> > > > >
>> > > > >
>> > > > >>
>> > > > >> 67. InteralMetadata can be a bit confusing with the metadata
>> field
>> > > > already
>> > > > >> there. Perhaps we can just call it OffsetEpoch. It might be
>> useful
>> > to
>> > > > make
>> > > > >> OffsetEpoch printable at least for debugging purpose. Once you do
>> > > that,
>> > > > we
>> > > > >> are already exposing the internal fields. So, not sure if it's
>> worth
>> > > > >> hiding
>> > > > >> them. If we do want to hide them, perhaps we can have sth like
>> the
>> > > > >> following. The binary encoding is probably more efficient than
>> JSON
>> > > for
>> > > > >> external storage.
>> > > > >>
>> > > > >> OffsetEpoch {
>> > > > >>  static OffsetEpoch decode(byte[]);
>> > > > >>
>> > > > >>   public byte[] encode();
>> > > > >>
>> > > > >>   public String toString();
>> > > > >> }
>> > > > >>
>> > > > >
>> > > > > Thanks much. I like this solution. I have updated the KIP
>> > accordingly.
>> > > > >
>> > > > >
>> > > > >
>> > > > >>
>> > > > >> Jun
>> > > > >>
>> > > > >> On Mon, Jan 8, 2018 at 4:22 PM, Dong Lin <lindong28@gmail.com>
>> > wrote:
>> > > > >>
>> > > > >> > Hey Jason,
>> > > > >> >
>> > > > >> > Certainly. This sounds good. I have updated the KIP to clarity
>> > that
>> > > > the
>> > > > >> > global epoch will be incremented by 1 each time a topic is
>> > deleted.
>> > > > >> >
>> > > > >> > Thanks,
>> > > > >> > Dong
>> > > > >> >
>> > > > >> > On Mon, Jan 8, 2018 at 4:09 PM, Jason Gustafson <
>> > jason@confluent.io
>> > > >
>> > > > >> > wrote:
>> > > > >> >
>> > > > >> > > Hi Dong,
>> > > > >> > >
>> > > > >> > >
>> > > > >> > > I think your approach will allow user to distinguish between
>> the
>> > > > >> metadata
>> > > > >> > > > before and after the topic deletion. I also agree that this
>> > can
>> > > be
>> > > > >> > > > potentially be useful to user. I am just not very sure
>> whether
>> > > we
>> > > > >> > already
>> > > > >> > > > have a good use-case to make the additional complexity
>> > > worthwhile.
>> > > > >> It
>> > > > >> > > seems
>> > > > >> > > > that this feature is kind of independent of the main
>> problem
>> > of
>> > > > this
>> > > > >> > KIP.
>> > > > >> > > > Could we add this as a future work?
>> > > > >> > >
>> > > > >> > >
>> > > > >> > > Do you think it's fair if we bump the topic epoch on deletion
>> > and
>> > > > >> leave
>> > > > >> > > propagation of the epoch for deleted topics for future work?
>> I
>> > > don't
>> > > > >> > think
>> > > > >> > > this adds much complexity and it makes the behavior
>> consistent:
>> > > > every
>> > > > >> > topic
>> > > > >> > > mutation results in an epoch bump.
>> > > > >> > >
>> > > > >> > > Thanks,
>> > > > >> > > Jason
>> > > > >> > >
>> > > > >> > > On Mon, Jan 8, 2018 at 3:14 PM, Dong Lin <
>> lindong28@gmail.com>
>> > > > wrote:
>> > > > >> > >
>> > > > >> > > > Hey Ismael,
>> > > > >> > > >
>> > > > >> > > > I guess we actually need user to see this field so that
>> user
>> > can
>> > > > >> store
>> > > > >> > > this
>> > > > >> > > > value in the external store together with the offset. We
>> just
>> > > > prefer
>> > > > >> > the
>> > > > >> > > > value to be opaque to discourage most users from
>> interpreting
>> > > this
>> > > > >> > value.
>> > > > >> > > > One more advantage of using such an opaque field is to be
>> able
>> > > to
>> > > > >> > evolve
>> > > > >> > > > the information (or schema) of this value without changing
>> > > > consumer
>> > > > >> API
>> > > > >> > > in
>> > > > >> > > > the future.
>> > > > >> > > >
>> > > > >> > > > I also thinking it is probably OK for user to be able to
>> > > interpret
>> > > > >> this
>> > > > >> > > > value, particularly for those advanced users.
>> > > > >> > > >
>> > > > >> > > > Thanks,
>> > > > >> > > > Dong
>> > > > >> > > >
>> > > > >> > > > On Mon, Jan 8, 2018 at 2:34 PM, Ismael Juma <
>> > ismael@juma.me.uk>
>> > > > >> wrote:
>> > > > >> > > >
>> > > > >> > > > > On Fri, Jan 5, 2018 at 7:15 PM, Jason Gustafson <
>> > > > >> jason@confluent.io>
>> > > > >> > > > > wrote:
>> > > > >> > > > > >
>> > > > >> > > > > > class OffsetAndMetadata {
>> > > > >> > > > > >   long offset;
>> > > > >> > > > > >   byte[] offsetMetadata;
>> > > > >> > > > > >   String metadata;
>> > > > >> > > > > > }
>> > > > >> > > > >
>> > > > >> > > > >
>> > > > >> > > > > > Admittedly, the naming is a bit annoying, but we can
>> > > probably
>> > > > >> come
>> > > > >> > up
>> > > > >> > > > > with
>> > > > >> > > > > > something better. Internally the byte array would have
>> a
>> > > > >> version.
>> > > > >> > If
>> > > > >> > > in
>> > > > >> > > > > the
>> > > > >> > > > > > future we have anything else we need to add, we can
>> update
>> > > the
>> > > > >> > > version
>> > > > >> > > > > and
>> > > > >> > > > > > we wouldn't need any new APIs.
>> > > > >> > > > > >
>> > > > >> > > > >
>> > > > >> > > > > We can also add fields to a class in a compatible way.
>> So,
>> > it
>> > > > >> seems
>> > > > >> > to
>> > > > >> > > me
>> > > > >> > > > > that the main advantage of the byte array is that it's
>> > opaque
>> > > to
>> > > > >> the
>> > > > >> > > > user.
>> > > > >> > > > > Is that correct? If so, we could also add any opaque
>> > metadata
>> > > > in a
>> > > > >> > > > subclass
>> > > > >> > > > > so that users don't even see it (unless they cast it, but
>> > then
>> > > > >> > they're
>> > > > >> > > on
>> > > > >> > > > > their own).
>> > > > >> > > > >
>> > > > >> > > > > Ismael
>> > > > >> > > > >
>> > > > >> > > > > The corresponding seek() and position() APIs might look
>> > > > something
>> > > > >> > like
>> > > > >> > > > > this:
>> > > > >> > > > > >
>> > > > >> > > > > > void seek(TopicPartition partition, long offset, byte[]
>> > > > >> > > > offsetMetadata);
>> > > > >> > > > > > byte[] positionMetadata(TopicPartition partition);
>> > > > >> > > > > >
>> > > > >> > > > > > What do you think?
>> > > > >> > > > > >
>> > > > >> > > > > > Thanks,
>> > > > >> > > > > > Jason
>> > > > >> > > > > >
>> > > > >> > > > > > On Thu, Jan 4, 2018 at 7:04 PM, Dong Lin <
>> > > lindong28@gmail.com
>> > > > >
>> > > > >> > > wrote:
>> > > > >> > > > > >
>> > > > >> > > > > > > Hey Jun, Jason,
>> > > > >> > > > > > >
>> > > > >> > > > > > > Thanks much for all the feedback. I have updated the
>> KIP
>> > > > >> based on
>> > > > >> > > the
>> > > > >> > > > > > > latest discussion. Can you help check whether it
>> looks
>> > > good?
>> > > > >> > > > > > >
>> > > > >> > > > > > > Thanks,
>> > > > >> > > > > > > Dong
>> > > > >> > > > > > >
>> > > > >> > > > > > > On Thu, Jan 4, 2018 at 5:36 PM, Dong Lin <
>> > > > lindong28@gmail.com
>> > > > >> >
>> > > > >> > > > wrote:
>> > > > >> > > > > > >
>> > > > >> > > > > > > > Hey Jun,
>> > > > >> > > > > > > >
>> > > > >> > > > > > > > Hmm... thinking about this more, I am not sure that
>> > the
>> > > > >> > proposed
>> > > > >> > > > API
>> > > > >> > > > > is
>> > > > >> > > > > > > > sufficient. For users that store offset
>> externally, we
>> > > > >> probably
>> > > > >> > > > need
>> > > > >> > > > > > > extra
>> > > > >> > > > > > > > API to return the leader_epoch and partition_epoch
>> for
>> > > all
>> > > > >> > > > partitions
>> > > > >> > > > > > > that
>> > > > >> > > > > > > > consumers are consuming. I suppose these users
>> > currently
>> > > > use
>> > > > >> > > > > position()
>> > > > >> > > > > > > to
>> > > > >> > > > > > > > get the offset. Thus we probably need a new method
>> > > > >> > > > > > positionWithEpoch(..)
>> > > > >> > > > > > > to
>> > > > >> > > > > > > > return <offset, partition_epoch, leader_epoch>.
>> Does
>> > > this
>> > > > >> sound
>> > > > >> > > > > > > reasonable?
>> > > > >> > > > > > > >
>> > > > >> > > > > > > > Thanks,
>> > > > >> > > > > > > > Dong
>> > > > >> > > > > > > >
>> > > > >> > > > > > > >
>> > > > >> > > > > > > > On Thu, Jan 4, 2018 at 5:26 PM, Jun Rao <
>> > > jun@confluent.io
>> > > > >
>> > > > >> > > wrote:
>> > > > >> > > > > > > >
>> > > > >> > > > > > > >> Hi, Dong,
>> > > > >> > > > > > > >>
>> > > > >> > > > > > > >> Yes, that's what I am thinking. OffsetEpoch will
>> be
>> > > > >> composed
>> > > > >> > of
>> > > > >> > > > > > > >> (partition_epoch,
>> > > > >> > > > > > > >> leader_epoch).
>> > > > >> > > > > > > >>
>> > > > >> > > > > > > >> Thanks,
>> > > > >> > > > > > > >>
>> > > > >> > > > > > > >> Jun
>> > > > >> > > > > > > >>
>> > > > >> > > > > > > >>
>> > > > >> > > > > > > >> On Thu, Jan 4, 2018 at 4:22 PM, Dong Lin <
>> > > > >> lindong28@gmail.com
>> > > > >> > >
>> > > > >> > > > > wrote:
>> > > > >> > > > > > > >>
>> > > > >> > > > > > > >> > Hey Jun,
>> > > > >> > > > > > > >> >
>> > > > >> > > > > > > >> > Thanks much. I like the the new API that you
>> > > proposed.
>> > > > I
>> > > > >> am
>> > > > >> > > not
>> > > > >> > > > > sure
>> > > > >> > > > > > > >> what
>> > > > >> > > > > > > >> > you exactly mean by offset_epoch. I suppose
>> that we
>> > > can
>> > > > >> use
>> > > > >> > > the
>> > > > >> > > > > pair
>> > > > >> > > > > > > of
>> > > > >> > > > > > > >> > (partition_epoch, leader_epoch) as the
>> > offset_epoch,
>> > > > >> right?
>> > > > >> > > > > > > >> >
>> > > > >> > > > > > > >> > Thanks,
>> > > > >> > > > > > > >> > Dong
>> > > > >> > > > > > > >> >
>> > > > >> > > > > > > >> > On Thu, Jan 4, 2018 at 4:02 PM, Jun Rao <
>> > > > >> jun@confluent.io>
>> > > > >> > > > wrote:
>> > > > >> > > > > > > >> >
>> > > > >> > > > > > > >> > > Hi, Dong,
>> > > > >> > > > > > > >> > >
>> > > > >> > > > > > > >> > > Got it. The api that you proposed works. The
>> > > question
>> > > > >> is
>> > > > >> > > > whether
>> > > > >> > > > > > > >> that's
>> > > > >> > > > > > > >> > the
>> > > > >> > > > > > > >> > > api that we want to have in the long term. My
>> > > concern
>> > > > >> is
>> > > > >> > > that
>> > > > >> > > > > > while
>> > > > >> > > > > > > >> the
>> > > > >> > > > > > > >> > api
>> > > > >> > > > > > > >> > > change is simple, the new api seems harder to
>> > > explain
>> > > > >> and
>> > > > >> > > use.
>> > > > >> > > > > For
>> > > > >> > > > > > > >> > example,
>> > > > >> > > > > > > >> > > a consumer storing offsets externally now
>> needs
>> > to
>> > > > call
>> > > > >> > > > > > > >> > > waitForMetadataUpdate() after calling seek().
>> > > > >> > > > > > > >> > >
>> > > > >> > > > > > > >> > > An alternative approach is to make the
>> following
>> > > > >> > compatible
>> > > > >> > > > api
>> > > > >> > > > > > > >> changes
>> > > > >> > > > > > > >> > in
>> > > > >> > > > > > > >> > > Consumer.
>> > > > >> > > > > > > >> > > * Add an additional OffsetEpoch field in
>> > > > >> > OffsetAndMetadata.
>> > > > >> > > > (no
>> > > > >> > > > > > need
>> > > > >> > > > > > > >> to
>> > > > >> > > > > > > >> > > change the CommitSync() api)
>> > > > >> > > > > > > >> > > * Add a new api seek(TopicPartition partition,
>> > long
>> > > > >> > offset,
>> > > > >> > > > > > > >> OffsetEpoch
>> > > > >> > > > > > > >> > > offsetEpoch). We can potentially deprecate the
>> > old
>> > > > api
>> > > > >> > > > > > > >> > seek(TopicPartition
>> > > > >> > > > > > > >> > > partition, long offset) in the future.
>> > > > >> > > > > > > >> > >
>> > > > >> > > > > > > >> > > The alternative approach has similar amount of
>> > api
>> > > > >> changes
>> > > > >> > > as
>> > > > >> > > > > > yours
>> > > > >> > > > > > > >> but
>> > > > >> > > > > > > >> > has
>> > > > >> > > > > > > >> > > the following benefits.
>> > > > >> > > > > > > >> > > 1. The api works in a similar way as how
>> offset
>> > > > >> management
>> > > > >> > > > works
>> > > > >> > > > > > now
>> > > > >> > > > > > > >> and
>> > > > >> > > > > > > >> > is
>> > > > >> > > > > > > >> > > probably what we want in the long term.
>> > > > >> > > > > > > >> > > 2. It can reset offsets better when there is
>> data
>> > > > loss
>> > > > >> due
>> > > > >> > > to
>> > > > >> > > > > > > unclean
>> > > > >> > > > > > > >> > > leader election or correlated replica failure.
>> > > > >> > > > > > > >> > > 3. It can reset offsets better when topic is
>> > > > recreated.
>> > > > >> > > > > > > >> > >
>> > > > >> > > > > > > >> > > Thanks,
>> > > > >> > > > > > > >> > >
>> > > > >> > > > > > > >> > > Jun
>> > > > >> > > > > > > >> > >
>> > > > >> > > > > > > >> > >
>> > > > >> > > > > > > >> > > On Thu, Jan 4, 2018 at 2:05 PM, Dong Lin <
>> > > > >> > > lindong28@gmail.com
>> > > > >> > > > >
>> > > > >> > > > > > > wrote:
>> > > > >> > > > > > > >> > >
>> > > > >> > > > > > > >> > > > Hey Jun,
>> > > > >> > > > > > > >> > > >
>> > > > >> > > > > > > >> > > > Yeah I agree that ideally we don't want an
>> ever
>> > > > >> growing
>> > > > >> > > > global
>> > > > >> > > > > > > >> metadata
>> > > > >> > > > > > > >> > > > version. I just think it may be more
>> desirable
>> > to
>> > > > >> keep
>> > > > >> > the
>> > > > >> > > > > > > consumer
>> > > > >> > > > > > > >> API
>> > > > >> > > > > > > >> > > > simple.
>> > > > >> > > > > > > >> > > >
>> > > > >> > > > > > > >> > > > In my current proposal, metadata version
>> > returned
>> > > > in
>> > > > >> the
>> > > > >> > > > fetch
>> > > > >> > > > > > > >> response
>> > > > >> > > > > > > >> > > > will be stored with the offset together.
>> More
>> > > > >> > > specifically,
>> > > > >> > > > > the
>> > > > >> > > > > > > >> > > > metadata_epoch in the new offset topic
>> schema
>> > > will
>> > > > be
>> > > > >> > the
>> > > > >> > > > > > largest
>> > > > >> > > > > > > >> > > > metadata_epoch from all the MetadataResponse
>> > and
>> > > > >> > > > FetchResponse
>> > > > >> > > > > > > ever
>> > > > >> > > > > > > >> > > > received by this consumer.
>> > > > >> > > > > > > >> > > >
>> > > > >> > > > > > > >> > > > We probably don't have to change the
>> consumer
>> > API
>> > > > for
>> > > > >> > > > > > > >> > > > commitSync(Map<TopicPartition,
>> > > OffsetAndMetadata>).
>> > > > >> If
>> > > > >> > > user
>> > > > >> > > > > > calls
>> > > > >> > > > > > > >> > > > commitSync(...) to commit offset 10 for a
>> given
>> > > > >> > partition,
>> > > > >> > > > for
>> > > > >> > > > > > > most
>> > > > >> > > > > > > >> > > > use-cases, this consumer instance should
>> have
>> > > > >> consumed
>> > > > >> > > > message
>> > > > >> > > > > > > with
>> > > > >> > > > > > > >> > > offset
>> > > > >> > > > > > > >> > > > 9 from this partition, in which case the
>> > consumer
>> > > > can
>> > > > >> > > > remember
>> > > > >> > > > > > and
>> > > > >> > > > > > > >> use
>> > > > >> > > > > > > >> > > the
>> > > > >> > > > > > > >> > > > metadata_epoch from the corresponding
>> > > FetchResponse
>> > > > >> when
>> > > > >> > > > > > > committing
>> > > > >> > > > > > > >> > > offset.
>> > > > >> > > > > > > >> > > > If user calls commitSync(..) to commit
>> offset
>> > 10
>> > > > for
>> > > > >> a
>> > > > >> > > given
>> > > > >> > > > > > > >> partition
>> > > > >> > > > > > > >> > > > without having consumed the message with
>> > offset 9
>> > > > >> using
>> > > > >> > > this
>> > > > >> > > > > > > >> consumer
>> > > > >> > > > > > > >> > > > instance, this is probably an advanced
>> > use-case.
>> > > In
>> > > > >> this
>> > > > >> > > > case
>> > > > >> > > > > > the
>> > > > >> > > > > > > >> > > advanced
>> > > > >> > > > > > > >> > > > user can retrieve the metadata_epoch using
>> the
>> > > > newly
>> > > > >> > added
>> > > > >> > > > > > > >> > > metadataEpoch()
>> > > > >> > > > > > > >> > > > API after it fetches the message with
>> offset 9
>> > > > >> (probably
>> > > > >> > > > from
>> > > > >> > > > > > > >> another
>> > > > >> > > > > > > >> > > > consumer instance) and encode this
>> > metadata_epoch
>> > > > in
>> > > > >> the
>> > > > >> > > > > > > >> > > > string OffsetAndMetadata.metadata. Do you
>> think
>> > > > this
>> > > > >> > > > solution
>> > > > >> > > > > > > would
>> > > > >> > > > > > > >> > work?
>> > > > >> > > > > > > >> > > >
>> > > > >> > > > > > > >> > > > By "not sure that I fully understand your
>> > latest
>> > > > >> > > > suggestion",
>> > > > >> > > > > > are
>> > > > >> > > > > > > >> you
>> > > > >> > > > > > > >> > > > referring to solution related to unclean
>> leader
>> > > > >> election
>> > > > >> > > > using
>> > > > >> > > > > > > >> > > leader_epoch
>> > > > >> > > > > > > >> > > > in my previous email?
>> > > > >> > > > > > > >> > > >
>> > > > >> > > > > > > >> > > > Thanks,
>> > > > >> > > > > > > >> > > > Dong
>> > > > >> > > > > > > >> > > >
>> > > > >> > > > > > > >> > > > On Thu, Jan 4, 2018 at 1:33 PM, Jun Rao <
>> > > > >> > jun@confluent.io
>> > > > >> > > >
>> > > > >> > > > > > wrote:
>> > > > >> > > > > > > >> > > >
>> > > > >> > > > > > > >> > > > > Hi, Dong,
>> > > > >> > > > > > > >> > > > >
>> > > > >> > > > > > > >> > > > > Not sure that I fully understand your
>> latest
>> > > > >> > suggestion.
>> > > > >> > > > > > > >> Returning an
>> > > > >> > > > > > > >> > > > ever
>> > > > >> > > > > > > >> > > > > growing global metadata version itself is
>> no
>> > > > ideal,
>> > > > >> > but
>> > > > >> > > is
>> > > > >> > > > > > fine.
>> > > > >> > > > > > > >> My
>> > > > >> > > > > > > >> > > > > question is whether the metadata version
>> > > returned
>> > > > >> in
>> > > > >> > the
>> > > > >> > > > > fetch
>> > > > >> > > > > > > >> > response
>> > > > >> > > > > > > >> > > > > needs to be stored with the offset
>> together
>> > if
>> > > > >> offsets
>> > > > >> > > are
>> > > > >> > > > > > > stored
>> > > > >> > > > > > > >> > > > > externally. If so, we also have to change
>> the
>> > > > >> consumer
>> > > > >> > > API
>> > > > >> > > > > for
>> > > > >> > > > > > > >> > > > commitSync()
>> > > > >> > > > > > > >> > > > > and need to worry about compatibility. If
>> we
>> > > > don't
>> > > > >> > store
>> > > > >> > > > the
>> > > > >> > > > > > > >> metadata
>> > > > >> > > > > > > >> > > > > version together with the offset, on a
>> > consumer
>> > > > >> > restart,
>> > > > >> > > > > it's
>> > > > >> > > > > > > not
>> > > > >> > > > > > > >> > clear
>> > > > >> > > > > > > >> > > > how
>> > > > >> > > > > > > >> > > > > we can ensure the metadata in the
>> consumer is
>> > > > high
>> > > > >> > > enough
>> > > > >> > > > > > since
>> > > > >> > > > > > > >> there
>> > > > >> > > > > > > >> > > is
>> > > > >> > > > > > > >> > > > no
>> > > > >> > > > > > > >> > > > > metadata version to compare with.
>> > > > >> > > > > > > >> > > > >
>> > > > >> > > > > > > >> > > > > Thanks,
>> > > > >> > > > > > > >> > > > >
>> > > > >> > > > > > > >> > > > > Jun
>> > > > >> > > > > > > >> > > > >
>> > > > >> > > > > > > >> > > > >
>> > > > >> > > > > > > >> > > > > On Wed, Jan 3, 2018 at 6:43 PM, Dong Lin <
>> > > > >> > > > > lindong28@gmail.com
>> > > > >> > > > > > >
>> > > > >> > > > > > > >> > wrote:
>> > > > >> > > > > > > >> > > > >
>> > > > >> > > > > > > >> > > > > > Hey Jun,
>> > > > >> > > > > > > >> > > > > >
>> > > > >> > > > > > > >> > > > > > Thanks much for the explanation.
>> > > > >> > > > > > > >> > > > > >
>> > > > >> > > > > > > >> > > > > > I understand the advantage of
>> > partition_epoch
>> > > > >> over
>> > > > >> > > > > > > >> metadata_epoch.
>> > > > >> > > > > > > >> > My
>> > > > >> > > > > > > >> > > > > > current concern is that the use of
>> > > leader_epoch
>> > > > >> and
>> > > > >> > > the
>> > > > >> > > > > > > >> > > partition_epoch
>> > > > >> > > > > > > >> > > > > > requires us considerable change to
>> > consumer's
>> > > > >> public
>> > > > >> > > API
>> > > > >> > > > > to
>> > > > >> > > > > > > take
>> > > > >> > > > > > > >> > care
>> > > > >> > > > > > > >> > > > of
>> > > > >> > > > > > > >> > > > > > the case where user stores offset
>> > externally.
>> > > > For
>> > > > >> > > > example,
>> > > > >> > > > > > > >> > > *consumer*.
>> > > > >> > > > > > > >> > > > > > *commitSync*(..) would have to take a
>> map
>> > > whose
>> > > > >> > value
>> > > > >> > > is
>> > > > >> > > > > > > >> <offset,
>> > > > >> > > > > > > >> > > > > metadata,
>> > > > >> > > > > > > >> > > > > > leader epoch, partition epoch>.
>> > > > >> > *consumer*.*seek*(...)
>> > > > >> > > > > would
>> > > > >> > > > > > > >> also
>> > > > >> > > > > > > >> > > need
>> > > > >> > > > > > > >> > > > > > leader_epoch and partition_epoch as
>> > > parameter.
>> > > > >> > > > Technically
>> > > > >> > > > > > we
>> > > > >> > > > > > > >> can
>> > > > >> > > > > > > >> > > > > probably
>> > > > >> > > > > > > >> > > > > > still make it work in a backward
>> compatible
>> > > > >> manner
>> > > > >> > > after
>> > > > >> > > > > > > careful
>> > > > >> > > > > > > >> > > design
>> > > > >> > > > > > > >> > > > > and
>> > > > >> > > > > > > >> > > > > > discussion. But these changes can make
>> the
>> > > > >> > consumer's
>> > > > >> > > > > > > interface
>> > > > >> > > > > > > >> > > > > > unnecessarily complex for more users
>> who do
>> > > not
>> > > > >> > store
>> > > > >> > > > > offset
>> > > > >> > > > > > > >> > > > externally.
>> > > > >> > > > > > > >> > > > > >
>> > > > >> > > > > > > >> > > > > > After thinking more about it, we can
>> > address
>> > > > all
>> > > > >> > > > problems
>> > > > >> > > > > > > >> discussed
>> > > > >> > > > > > > >> > > by
>> > > > >> > > > > > > >> > > > > only
>> > > > >> > > > > > > >> > > > > > using the metadata_epoch without
>> > introducing
>> > > > >> > > > leader_epoch
>> > > > >> > > > > or
>> > > > >> > > > > > > the
>> > > > >> > > > > > > >> > > > > > partition_epoch. The current KIP
>> describes
>> > > the
>> > > > >> > changes
>> > > > >> > > > to
>> > > > >> > > > > > the
>> > > > >> > > > > > > >> > > consumer
>> > > > >> > > > > > > >> > > > > API
>> > > > >> > > > > > > >> > > > > > and how the new API can be used if user
>> > > stores
>> > > > >> > offset
>> > > > >> > > > > > > >> externally.
>> > > > >> > > > > > > >> > In
>> > > > >> > > > > > > >> > > > > order
>> > > > >> > > > > > > >> > > > > > to address the scenario you described
>> > > earlier,
>> > > > we
>> > > > >> > can
>> > > > >> > > > > > include
>> > > > >> > > > > > > >> > > > > > metadata_epoch in the FetchResponse and
>> the
>> > > > >> > > > > > > LeaderAndIsrRequest.
>> > > > >> > > > > > > >> > > > Consumer
>> > > > >> > > > > > > >> > > > > > remembers the largest metadata_epoch
>> from
>> > all
>> > > > the
>> > > > >> > > > > > > FetchResponse
>> > > > >> > > > > > > >> it
>> > > > >> > > > > > > >> > > has
>> > > > >> > > > > > > >> > > > > > received. The metadata_epoch committed
>> with
>> > > the
>> > > > >> > > offset,
>> > > > >> > > > > > either
>> > > > >> > > > > > > >> > within
>> > > > >> > > > > > > >> > > > or
>> > > > >> > > > > > > >> > > > > > outside Kafka, should be the largest
>> > > > >> metadata_epoch
>> > > > >> > > > across
>> > > > >> > > > > > all
>> > > > >> > > > > > > >> > > > > > FetchResponse and MetadataResponse ever
>> > > > received
>> > > > >> by
>> > > > >> > > this
>> > > > >> > > > > > > >> consumer.
>> > > > >> > > > > > > >> > > > > >
>> > > > >> > > > > > > >> > > > > > The drawback of using only the
>> > metadata_epoch
>> > > > is
>> > > > >> > that
>> > > > >> > > we
>> > > > >> > > > > can
>> > > > >> > > > > > > not
>> > > > >> > > > > > > >> > > always
>> > > > >> > > > > > > >> > > > > do
>> > > > >> > > > > > > >> > > > > > the smart offset reset in case of
>> unclean
>> > > > leader
>> > > > >> > > > election
>> > > > >> > > > > > > which
>> > > > >> > > > > > > >> you
>> > > > >> > > > > > > >> > > > > > mentioned earlier. But in most case,
>> > unclean
>> > > > >> leader
>> > > > >> > > > > election
>> > > > >> > > > > > > >> > probably
>> > > > >> > > > > > > >> > > > > > happens when consumer is not
>> > > > >> rebalancing/restarting.
>> > > > >> > > In
>> > > > >> > > > > > these
>> > > > >> > > > > > > >> > cases,
>> > > > >> > > > > > > >> > > > > either
>> > > > >> > > > > > > >> > > > > > consumer is not directly affected by
>> > unclean
>> > > > >> leader
>> > > > >> > > > > election
>> > > > >> > > > > > > >> since
>> > > > >> > > > > > > >> > it
>> > > > >> > > > > > > >> > > > is
>> > > > >> > > > > > > >> > > > > > not consuming from the end of the log,
>> or
>> > > > >> consumer
>> > > > >> > can
>> > > > >> > > > > > derive
>> > > > >> > > > > > > >> the
>> > > > >> > > > > > > >> > > > > > leader_epoch from the most recent
>> message
>> > > > >> received
>> > > > >> > > > before
>> > > > >> > > > > it
>> > > > >> > > > > > > >> sees
>> > > > >> > > > > > > >> > > > > > OffsetOutOfRangeException. So I am not
>> sure
>> > > it
>> > > > is
>> > > > >> > > worth
>> > > > >> > > > > > adding
>> > > > >> > > > > > > >> the
>> > > > >> > > > > > > >> > > > > > leader_epoch to consumer API to address
>> the
>> > > > >> > remaining
>> > > > >> > > > > corner
>> > > > >> > > > > > > >> case.
>> > > > >> > > > > > > >> > > What
>> > > > >> > > > > > > >> > > > > do
>> > > > >> > > > > > > >> > > > > > you think?
>> > > > >> > > > > > > >> > > > > >
>> > > > >> > > > > > > >> > > > > > Thanks,
>> > > > >> > > > > > > >> > > > > > Dong
>> > > > >> > > > > > > >> > > > > >
>> > > > >> > > > > > > >> > > > > >
>> > > > >> > > > > > > >> > > > > >
>> > > > >> > > > > > > >> > > > > > On Tue, Jan 2, 2018 at 6:28 PM, Jun Rao
>> <
>> > > > >> > > > jun@confluent.io
>> > > > >> > > > > >
>> > > > >> > > > > > > >> wrote:
>> > > > >> > > > > > > >> > > > > >
>> > > > >> > > > > > > >> > > > > > > Hi, Dong,
>> > > > >> > > > > > > >> > > > > > >
>> > > > >> > > > > > > >> > > > > > > Thanks for the reply.
>> > > > >> > > > > > > >> > > > > > >
>> > > > >> > > > > > > >> > > > > > > To solve the topic recreation issue,
>> we
>> > > could
>> > > > >> use
>> > > > >> > > > > either a
>> > > > >> > > > > > > >> global
>> > > > >> > > > > > > >> > > > > > metadata
>> > > > >> > > > > > > >> > > > > > > version or a partition level epoch.
>> But
>> > > > either
>> > > > >> one
>> > > > >> > > > will
>> > > > >> > > > > > be a
>> > > > >> > > > > > > >> new
>> > > > >> > > > > > > >> > > > > concept,
>> > > > >> > > > > > > >> > > > > > > right? To me, the latter seems more
>> > > natural.
>> > > > It
>> > > > >> > also
>> > > > >> > > > > makes
>> > > > >> > > > > > > it
>> > > > >> > > > > > > >> > > easier
>> > > > >> > > > > > > >> > > > to
>> > > > >> > > > > > > >> > > > > > > detect if a consumer's offset is still
>> > > valid
>> > > > >> > after a
>> > > > >> > > > > topic
>> > > > >> > > > > > > is
>> > > > >> > > > > > > >> > > > > recreated.
>> > > > >> > > > > > > >> > > > > > As
>> > > > >> > > > > > > >> > > > > > > you pointed out, we don't need to
>> store
>> > the
>> > > > >> > > partition
>> > > > >> > > > > > epoch
>> > > > >> > > > > > > in
>> > > > >> > > > > > > >> > the
>> > > > >> > > > > > > >> > > > > > message.
>> > > > >> > > > > > > >> > > > > > > The following is what I am thinking.
>> > When a
>> > > > >> > > partition
>> > > > >> > > > is
>> > > > >> > > > > > > >> created,
>> > > > >> > > > > > > >> > > we
>> > > > >> > > > > > > >> > > > > can
>> > > > >> > > > > > > >> > > > > > > assign a partition epoch from an
>> > > > >> ever-increasing
>> > > > >> > > > global
>> > > > >> > > > > > > >> counter
>> > > > >> > > > > > > >> > and
>> > > > >> > > > > > > >> > > > > store
>> > > > >> > > > > > > >> > > > > > > it in /brokers/topics/[topic]/
>> > > > >> > > > partitions/[partitionId]
>> > > > >> > > > > in
>> > > > >> > > > > > > ZK.
>> > > > >> > > > > > > >> > The
>> > > > >> > > > > > > >> > > > > > > partition
>> > > > >> > > > > > > >> > > > > > > epoch is propagated to every broker.
>> The
>> > > > >> consumer
>> > > > >> > > will
>> > > > >> > > > > be
>> > > > >> > > > > > > >> > tracking
>> > > > >> > > > > > > >> > > a
>> > > > >> > > > > > > >> > > > > > tuple
>> > > > >> > > > > > > >> > > > > > > of <offset, leader epoch, partition
>> > epoch>
>> > > > for
>> > > > >> > > > offsets.
>> > > > >> > > > > > If a
>> > > > >> > > > > > > >> > topic
>> > > > >> > > > > > > >> > > is
>> > > > >> > > > > > > >> > > > > > > recreated, it's possible that a
>> > consumer's
>> > > > >> offset
>> > > > >> > > and
>> > > > >> > > > > > leader
>> > > > >> > > > > > > >> > epoch
>> > > > >> > > > > > > >> > > > > still
>> > > > >> > > > > > > >> > > > > > > match that in the broker, but
>> partition
>> > > epoch
>> > > > >> > won't
>> > > > >> > > > be.
>> > > > >> > > > > In
>> > > > >> > > > > > > >> this
>> > > > >> > > > > > > >> > > case,
>> > > > >> > > > > > > >> > > > > we
>> > > > >> > > > > > > >> > > > > > > can potentially still treat the
>> > consumer's
>> > > > >> offset
>> > > > >> > as
>> > > > >> > > > out
>> > > > >> > > > > > of
>> > > > >> > > > > > > >> range
>> > > > >> > > > > > > >> > > and
>> > > > >> > > > > > > >> > > > > > reset
>> > > > >> > > > > > > >> > > > > > > the offset based on the offset reset
>> > policy
>> > > > in
>> > > > >> the
>> > > > >> > > > > > consumer.
>> > > > >> > > > > > > >> This
>> > > > >> > > > > > > >> > > > seems
>> > > > >> > > > > > > >> > > > > > > harder to do with a global metadata
>> > > version.
>> > > > >> > > > > > > >> > > > > > >
>> > > > >> > > > > > > >> > > > > > > Jun
>> > > > >> > > > > > > >> > > > > > >
>> > > > >> > > > > > > >> > > > > > >
>> > > > >> > > > > > > >> > > > > > >
>> > > > >> > > > > > > >> > > > > > > On Mon, Dec 25, 2017 at 6:56 AM, Dong
>> > Lin <
>> > > > >> > > > > > > >> lindong28@gmail.com>
>> > > > >> > > > > > > >> > > > wrote:
>> > > > >> > > > > > > >> > > > > > >
>> > > > >> > > > > > > >> > > > > > > > Hey Jun,
>> > > > >> > > > > > > >> > > > > > > >
>> > > > >> > > > > > > >> > > > > > > > This is a very good example. After
>> > > thinking
>> > > > >> > > through
>> > > > >> > > > > this
>> > > > >> > > > > > > in
>> > > > >> > > > > > > >> > > > detail, I
>> > > > >> > > > > > > >> > > > > > > agree
>> > > > >> > > > > > > >> > > > > > > > that we need to commit offset with
>> > leader
>> > > > >> epoch
>> > > > >> > in
>> > > > >> > > > > order
>> > > > >> > > > > > > to
>> > > > >> > > > > > > >> > > address
>> > > > >> > > > > > > >> > > > > > this
>> > > > >> > > > > > > >> > > > > > > > example.
>> > > > >> > > > > > > >> > > > > > > >
>> > > > >> > > > > > > >> > > > > > > > I think the remaining question is
>> how
>> > to
>> > > > >> address
>> > > > >> > > the
>> > > > >> > > > > > > >> scenario
>> > > > >> > > > > > > >> > > that
>> > > > >> > > > > > > >> > > > > the
>> > > > >> > > > > > > >> > > > > > > > topic is deleted and re-created. One
>> > > > possible
>> > > > >> > > > solution
>> > > > >> > > > > > is
>> > > > >> > > > > > > to
>> > > > >> > > > > > > >> > > commit
>> > > > >> > > > > > > >> > > > > > > offset
>> > > > >> > > > > > > >> > > > > > > > with both the leader epoch and the
>> > > metadata
>> > > > >> > > version.
>> > > > >> > > > > The
>> > > > >> > > > > > > >> logic
>> > > > >> > > > > > > >> > > and
>> > > > >> > > > > > > >> > > > > the
>> > > > >> > > > > > > >> > > > > > > > implementation of this solution does
>> > not
>> > > > >> > require a
>> > > > >> > > > new
>> > > > >> > > > > > > >> concept
>> > > > >> > > > > > > >> > > > (e.g.
>> > > > >> > > > > > > >> > > > > > > > partition epoch) and it does not
>> > require
>> > > > any
>> > > > >> > > change
>> > > > >> > > > to
>> > > > >> > > > > > the
>> > > > >> > > > > > > >> > > message
>> > > > >> > > > > > > >> > > > > > format
>> > > > >> > > > > > > >> > > > > > > > or leader epoch. It also allows us
>> to
>> > > order
>> > > > >> the
>> > > > >> > > > > metadata
>> > > > >> > > > > > > in
>> > > > >> > > > > > > >> a
>> > > > >> > > > > > > >> > > > > > > > straightforward manner which may be
>> > > useful
>> > > > in
>> > > > >> > the
>> > > > >> > > > > > future.
>> > > > >> > > > > > > >> So it
>> > > > >> > > > > > > >> > > may
>> > > > >> > > > > > > >> > > > > be
>> > > > >> > > > > > > >> > > > > > a
>> > > > >> > > > > > > >> > > > > > > > better solution than generating a
>> > random
>> > > > >> > partition
>> > > > >> > > > > epoch
>> > > > >> > > > > > > >> every
>> > > > >> > > > > > > >> > > time
>> > > > >> > > > > > > >> > > > > we
>> > > > >> > > > > > > >> > > > > > > > create a partition. Does this sound
>> > > > >> reasonable?
>> > > > >> > > > > > > >> > > > > > > >
>> > > > >> > > > > > > >> > > > > > > > Previously one concern with using
>> the
>> > > > >> metadata
>> > > > >> > > > version
>> > > > >> > > > > > is
>> > > > >> > > > > > > >> that
>> > > > >> > > > > > > >> > > > > consumer
>> > > > >> > > > > > > >> > > > > > > > will be forced to refresh metadata
>> even
>> > > if
>> > > > >> > > metadata
>> > > > >> > > > > > > version
>> > > > >> > > > > > > >> is
>> > > > >> > > > > > > >> > > > > > increased
>> > > > >> > > > > > > >> > > > > > > > due to topics that the consumer is
>> not
>> > > > >> > interested
>> > > > >> > > > in.
>> > > > >> > > > > > Now
>> > > > >> > > > > > > I
>> > > > >> > > > > > > >> > > > realized
>> > > > >> > > > > > > >> > > > > > that
>> > > > >> > > > > > > >> > > > > > > > this is probably not a problem.
>> > Currently
>> > > > >> client
>> > > > >> > > > will
>> > > > >> > > > > > > >> refresh
>> > > > >> > > > > > > >> > > > > metadata
>> > > > >> > > > > > > >> > > > > > > > either due to
>> InvalidMetadataException
>> > in
>> > > > the
>> > > > >> > > > response
>> > > > >> > > > > > > from
>> > > > >> > > > > > > >> > > broker
>> > > > >> > > > > > > >> > > > or
>> > > > >> > > > > > > >> > > > > > due
>> > > > >> > > > > > > >> > > > > > > > to metadata expiry. The addition of
>> the
>> > > > >> metadata
>> > > > >> > > > > version
>> > > > >> > > > > > > >> should
>> > > > >> > > > > > > >> > > > > > increase
>> > > > >> > > > > > > >> > > > > > > > the overhead of metadata refresh
>> caused
>> > > by
>> > > > >> > > > > > > >> > > > InvalidMetadataException.
>> > > > >> > > > > > > >> > > > > If
>> > > > >> > > > > > > >> > > > > > > > client refresh metadata due to
>> expiry
>> > and
>> > > > it
>> > > > >> > > > receives
>> > > > >> > > > > a
>> > > > >> > > > > > > >> > metadata
>> > > > >> > > > > > > >> > > > > whose
>> > > > >> > > > > > > >> > > > > > > > version is lower than the current
>> > > metadata
>> > > > >> > > version,
>> > > > >> > > > we
>> > > > >> > > > > > can
>> > > > >> > > > > > > >> > reject
>> > > > >> > > > > > > >> > > > the
>> > > > >> > > > > > > >> > > > > > > > metadata but still reset the
>> metadata
>> > > age,
>> > > > >> which
>> > > > >> > > > > > > essentially
>> > > > >> > > > > > > >> > keep
>> > > > >> > > > > > > >> > > > the
>> > > > >> > > > > > > >> > > > > > > > existing behavior in the client.
>> > > > >> > > > > > > >> > > > > > > >
>> > > > >> > > > > > > >> > > > > > > > Thanks much,
>> > > > >> > > > > > > >> > > > > > > > Dong
>> > > > >> > > > > > > >> > > > > > > >
>> > > > >> > > > > > > >> > > > > > >
>> > > > >> > > > > > > >> > > > > >
>> > > > >> > > > > > > >> > > > >
>> > > > >> > > > > > > >> > > >
>> > > > >> > > > > > > >> > >
>> > > > >> > > > > > > >> >
>> > > > >> > > > > > > >>
>> > > > >> > > > > > > >
>> > > > >> > > > > > > >
>> > > > >> > > > > > >
>> > > > >> > > > > >
>> > > > >> > > > >
>> > > > >> > > >
>> > > > >> > >
>> > > > >> >
>> > > > >>
>> > > > >
>> > > > >
>> > > >
>> > >
>> >
>>
>
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message