kafka-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Becket Qin <becket....@gmail.com>
Subject Re: [VOTE] KIP-192 : Provide cleaner semantics when idempotence is enabled
Date Mon, 11 Sep 2017 21:39:28 GMT
Hi Apurva,

Sorry for being late on this thread. I am trying to understand the
implementation of case that we will throw DuplicateSequenceException. My
understanding is the following:
1. On the broker side, we will cache 5 most recent
sequence/timestamp/offset (STO) for each of the producer ID.
2. When duplicate occurs and the producer has max.in.flight.requests set to
<=5, we can return the timestamp/offset from the cached STO.
3. When duplicate occurs and the producer has max.in.flight.requests
greater than 5. We may need to return DuplicateSequenceException just to
indicate the sequence is duplicate, but the ProduceResponse will not have
timestamp/offset because it may be out of the 5 entries in the cache.

Is my understanding correct? If it is the current implementation, I have a
few concerns:
1. One potential issue for this is that if there are a lot of producers
producing to the same cluster (e.g. a Map-Reduce job) we may still spend a
lot of memory on caching the most recent STO.
2. In most cases, we are essentially caching the sequnce/timestamp/offset
entries that may never be used again.

Since we are making protocol changes, I am wondering if we can improve the
above two cases by doing the following:
1. Add a per partition LastAckedSequence field in the ProduceRequest.
2. The broker will remove the cached STO entries whose sequence is less
than or equals to LastAckedSequence for each Partition/PID.
3. Have a global STO cache to cap the number of total cached STO entries to
some number, say 1 million. This total number is shared by all the
producers. If this number is reached, we will remove the entries from the
producer who has the most cached STO entries.
4. If there is a sequence smaller than the last sequence and there is no
entry in the STO cache, we return DuplicateSequenceException without

With the above changes, we can have the following benefits:
1. avoid caching the sequence/timestamp/offset unnecessarily because all
the cached entries are the entries that hasn't been confirmed by the
2. no magic number of 5 max.in.flight.requests.per.connection
3. bounded memory footprint on the cached sequence/timestamp/offset entries.

Hope it's not too late to have the changes if that makes sense.


Jiangjie (Becket) Qin

On Mon, Sep 11, 2017 at 11:21 AM, Apurva Mehta <apurva@confluent.io> wrote:

> Thanks for the votes everyone.
> One of the proposals here was to raise a 'DuplicateSequenceException' to
> the user if the broker detected that one of the internal retries resulted
> in the duplicate, and the metadata for the original batch was no longer
> cached.
> However, when implementing this change, I realized that this is quite
> unintuitive from the user's point of view. In reality, the 'duplicate' is
> only due to internal retries -- something that the user has no visibility
> into. And secondly, this is not an error: the batch has been persisted,
> only the cached metadata has been lost.
> I think the better approach is to return the a 'success' but make it clear
> that there is no record metadata. If the user tries to access
> `RecordMetadata.offset` or `RecordMetadata.timestamp` methods of the
> returned metadata, we can raise a 'NoMetadataAvailableException' or
> something like that.
> This way users who don't access the 'offset' and 'timestamp' fields would
> not notice a change. For the users who do, the offset and timestamp will
> not silently be invalid: they will be notified through an exception.
> This seems like the cleanest way forward and I would like to make this
> small change to the KIP.
> Does anybody have any objections?
> Thanks,
> Apurva
> On Thu, Sep 7, 2017 at 9:44 PM, Apurva Mehta <apurva@confluent.io> wrote:
> > Thanks for the comments Ismael.
> >
> > I have gone ahead and incorporated all your suggestions in the KIP
> > document. You convinced me on adding max.message.bytes :)
> >
> > Apurva
> >
> > On Thu, Sep 7, 2017 at 6:12 PM, Ismael Juma <ismael@juma.me.uk> wrote:
> >
> >> Thanks for the KIP. +1 (binding) from me. A few minor comments:
> >>
> >> 1. We should add a note to the backwards compatibility section
> explaining
> >> the impact of throwing DuplicateSequenceException (a new exception) from
> >> `send`. As I understand it, it's not an issue, but good to include it in
> >> the KIP.
> >>
> >> 2. For clarity, it's good to highlight in some way the new fields in the
> >> protocol definition itself
> >>
> >> 3. I understand that you decided not to add max.message.bytes because
> it's
> >> unrelated to this KIP. I'll try to persuade you that we should, but it's
> >> not a blocker if you don't agree. The reasons are: 1. The implementation
> >> effort to add it is minimal since it's a topic config like message
> format
> >> version, 2. It's clearly beneficial for the producer to have that
> >> information, 3. It's compact (just a number), 4. It's nice to avoid
> >> another
> >> protocol bump for a small change like that.
> >>
> >> Thanks,
> >> Ismael
> >>
> >> On Thu, Sep 7, 2017 at 9:51 PM, Apurva Mehta <apurva@confluent.io>
> wrote:
> >>
> >> > Hi,
> >> >
> >> > I'd like to start a vote for KIP-192:
> >> >
> >> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> >> > 192+%3A+Provide+cleaner+semantics+when+idempotence+is+enabled
> >> >
> >> > Thanks,
> >> > Apurva
> >> >
> >>
> >
> >

  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message