kafka-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Guozhang Wang <wangg...@gmail.com>
Subject Re: [VOTE] KIP-280: Enhanced log compaction
Date Mon, 04 Jun 2018 16:45:34 GMT
Thanks Luís for working on this KIP. I'm +1.

One note is that there is another KIP, KIP-228 going on for allowing
negative timestamp (I'm cc'ing the contributor of that KIP as well). One
related issue from KIP-228 is the handling of "-1": we will treat all other
negative values normally as some time before UTC 1970 Jan 1st, but keep the
behavior of treating "-1" as unknown.

So depending on which KIP gets merged first, we would need make sure by the
end of the day broker behavior for using timestamp for comparison would be,
when using timestamp as the comparison value:

1. if none of them are -1, just compare the values normally as currently
proposed in KIP-280.
2. if one of them is -1 and the other is not, choose the other.
3. if both of them is -1, use offset as tie-breaker.


Guozhang

On Mon, Jun 4, 2018 at 2:04 AM, Luís Cabral <luis_cabral@yahoo.com.invalid>
wrote:

>  Hi all,
>
>
> After a thorough discussion, this KIP is now ready to go into a vote:
>
> KIP-280: Enhanced log compaction - Apache Kafka - Apache Software
> Foundation
>
>
> |
> |
> |  |
> KIP-280: Enhanced log compaction - Apache Kafka - Apache Software Founda...
>
>
>  |
>
>  |
>
>  |
>
>
>
>
> Kind Regards,
> Luís Cabral    On Friday, June 1, 2018, 8:51:50 PM GMT+2, Guozhang Wang <
> wangguoz@gmail.com> wrote:
>
>  Hello Luis,
>
> Please feel free to continue on the voting process as there seems be no
> further comments on this thread (I have synced with Jun and Ismael
> separately offline and they are in consent with the approach to add the
> fields in offset map for all cases).
>
> We can still continue on reviewing the PR while voting on the thread so
> that it can get in earlier into trunk for the next release.
>
>
>
> Guozhang
>
>
> On Mon, May 28, 2018 at 11:04 AM, Matthias J. Sax <matthias@confluent.io>
> wrote:
>
> > Luis,
> >
> > this week is feature freeze for the upcoming 2.0 release and most people
> > focus on getting their PR merged. Thus, this and the next week (until
> > code freeze) KIPs for 2.1 are not a high priority for most people.
> >
> > Please bear with us. Thanks for your understanding.
> >
> >
> > -Matthias
> >
> > On 5/28/18 5:21 AM, Luís Cabral wrote:
> > >  Hi Guozhang,
> > >
> > > It doesn't look like there will be much feedback here.
> > > Is it alright if I just update the spec back to a standardized
> behaviour
> > and move this along?
> > >
> > > Cheers,Luis
> > >    On Thursday, May 24, 2018, 11:20:01 AM GMT+2, Luis Cabral <
> > luis_cabral@yahoo.com> wrote:
> > >
> > >  Hi Jun / Ismael,
> > >
> > > Any chance to get your opinion on this?
> > > Thanks in advance!
> > >
> > > Regards,
> > > Luís
> > >
> > >> On 22 May 2018, at 17:30, Guozhang Wang <wangguoz@gmail.com> wrote:
> > >>
> > >> Hello Luís,
> > >>
> > >> While reviewing your PR I realized my previous calculation on the
> memory
> > >> usage was incorrect: in fact, in the current implementation, each
> entry
> > in
> > >> the memory-bounded cache is 16 (default MD5 hash digest length) + 8
> > (long
> > >> type) = 24 bytes, and if we add the long-typed version value it is 32
> > >> bytes. I.e. each entry will be increased by 33%, not doubling.
> > >>
> > >> After redoing the math I'm bit leaning towards just adding this entry
> > for
> > >> all cases rather than treating timestamp differently with others
> (sorry
> > for
> > >> being back and forth, but I just want to make sure we've got a good
> > balance
> > >> between efficiency and semantics consistency). I've also chatted with
> > Jun
> > >> and Ismael about this (cc'ed), and maybe you guys can chime in here as
> > well.
> > >>
> > >>
> > >> Guozhang
> > >>
> > >>
> > >> On Tue, May 22, 2018 at 6:45 AM, Luís Cabral
> > <Luis_Cabral@yahoo.com.invalid>
> > >> wrote:
> > >>
> > >>> Hi Matthias / Guozhang,
> > >>>
> > >>> Were the questions clarified?
> > >>> Please feel free to add more feedback, otherwise it would be nice to
> > move
> > >>> this topic onwards 😊
> > >>>
> > >>> Kind Regards,
> > >>> Luís Cabral
> > >>>
> > >>> From: Guozhang Wang
> > >>> Sent: 09 May 2018 20:00
> > >>> To: dev@kafka.apache.org
> > >>> Subject: Re: [DISCUSS] KIP-280: Enhanced log compaction
> > >>>
> > >>> I have thought about being consistency in strategy v.s. practical
> > concerns
> > >>> about storage convenience to its impact on compaction effectiveness.
> > >>>
> > >>> The different between timestamp and the header key-value pairs is
> that
> > for
> > >>> the latter, as I mentioned before, "it is arguably out of Kafka's
> > control,
> > >>> and indeed users may (mistakenly) generate many records with the same
> > key
> > >>> and the same header value." So giving up tie breakers may result in
> > very
> > >>> poor compaction effectiveness when it happens, while for timestamps
> the
> > >>> likelihood of this is considered very small.
> > >>>
> > >>>
> > >>> Guozhang
> > >>>
> > >>>
> > >>> On Sun, May 6, 2018 at 8:55 PM, Matthias J. Sax <
> matthias@confluent.io
> > >
> > >>> wrote:
> > >>>
> > >>>> Thanks.
> > >>>>
> > >>>> To reverse the question: if this argument holds, why does it not
> apply
> > >>>> to the case when the header key is used as compaction attribute?
> > >>>>
> > >>>> I am not against keeping both records in case timestamps are equal,
> > but
> > >>>> shouldn't we apply the same strategy for all cases and don't use
> > offset
> > >>>> as tie-breaker at all?
> > >>>>
> > >>>>
> > >>>> -Matthias
> > >>>>
> > >>>>> On 5/6/18 8:47 PM, Guozhang Wang wrote:
> > >>>>> Hello Matthias,
> > >>>>>
> > >>>>> The related discussion was in the PR:
> > >>>>> https://github.com/apache/kafka/pull/4822#discussion_r184588037
> > >>>>>
> > >>>>> The concern is that, to use offset as tie breaker we need to
double
> > the
> > >>>>> entry size of the entry in bounded compaction cache, and hence
> > largely
> > >>>>> reduce the effectiveness of the compaction itself. Since with
> > >>>> milliseconds
> > >>>>> timestamp the scenario of ties with the same key is expected
to be
> > >>>> small, I
> > >>>>> think it would be a reasonable tradeoff to make.
> > >>>>>
> > >>>>>
> > >>>>> Guozhang
> > >>>>>
> > >>>>> On Sun, May 6, 2018 at 9:37 AM, Matthias J. Sax <
> > matthias@confluent.io
> > >>>>
> > >>>>> wrote:
> > >>>>>
> > >>>>>> Hi,
> > >>>>>>
> > >>>>>> I just updated myself on this KIP. One question (maybe
it was
> > >>> discussed
> > >>>>>> and I missed it). What is the motivation to not use the
offset as
> > tie
> > >>>>>> breaker for the "timestamp" case? Isn't this inconsistent
> behavior?
> > >>>>>>
> > >>>>>>
> > >>>>>> -Matthias
> > >>>>>>
> > >>>>>>
> > >>>>>>> On 5/2/18 2:07 PM, Guozhang Wang wrote:
> > >>>>>>> Hello Luís,
> > >>>>>>>
> > >>>>>>> Sorry for the late reply.
> > >>>>>>>
> > >>>>>>> My understanding is that such duplicates will only
happen if the
> > >>>>>> non-offset
> > >>>>>>> version value, either the timestamp or some long-typed
header
> key,
> > >>> are
> > >>>>>> the
> > >>>>>>> same (i.e. we cannot break ties).
> > >>>>>>>
> > >>>>>>> 1. For timestamp, which is in milli-seconds, I think
in practice
> > the
> > >>>>>>> likelihood of records with the same key and the same
milli-sec
> > >>>> timestamp
> > >>>>>>> are very small. And hence the duplicate amount should
be very
> > small.
> > >>>>>>>
> > >>>>>>> 2. For long-typed header key, it is arguably out of
Kafka's
> > control,
> > >>>> and
> > >>>>>>> indeed users may (mistakenly) generate many records
with the same
> > key
> > >>>> and
> > >>>>>>> the same header value.
> > >>>>>>>
> > >>>>>>>
> > >>>>>>> So I'd like to propose a counter-offer: for 1), we
still use
> only 8
> > >>>> bytes
> > >>>>>>> and allows for potential duplicates due to ties; for
2) we use 16
> > >>> bytes
> > >>>>>> to
> > >>>>>>> always break ties. The motivation for distinguishing
1) and 2),
> is
> > >>> that
> > >>>>>> my
> > >>>>>>> expectation for 1) would be much common, and hence
worth special
> > >>>> handling
> > >>>>>>> it to be more effective in cleaning. WDYT?
> > >>>>>>>
> > >>>>>>>
> > >>>>>>> Guozhang
> > >>>>>>>
> > >>>>>>>
> > >>>>>>>
> > >>>>>>> On Wed, May 2, 2018 at 2:36 AM, Luís Cabral
> > >>>>>> <luis_cabral@yahoo.com.invalid>
> > >>>>>>> wrote:
> > >>>>>>>
> > >>>>>>>> Hi Guozhang,
> > >>>>>>>>
> > >>>>>>>> Have you managed to have a look at my reply?
> > >>>>>>>> How do you feel about this?
> > >>>>>>>>
> > >>>>>>>> Kind Regards,
> > >>>>>>>> Luís Cabral
> > >>>>>>>>    On Monday, April 30, 2018, 9:27:15 AM GMT+2,
Luís Cabral <
> > >>>>>>>> luis_cabral@yahoo.com> wrote:
> > >>>>>>>>
> > >>>>>>>>  Hi Guozhang,
> > >>>>>>>>
> > >>>>>>>> I understand the argument, but this is a hazardous
compromise
> for
> > >>>> using
> > >>>>>>>> Kafka as an event store (as is my original intention).
> > >>>>>>>>
> > >>>>>>>> I expect to have many duplicated messages in Kafka
as the
> overall
> > >>>>>>>> architecture being used allows for the producer
to re-send a
> fresh
> > >>>>>> state of
> > >>>>>>>> the backed data into Kafka.Though this scenario
is not common,
> as
> > >>> the
> > >>>>>>>> intention is for Kafka to bear the weight of replaying
all the
> > >>> records
> > >>>>>> for
> > >>>>>>>> new consumers, but it will occasionally happen.
> > >>>>>>>>
> > >>>>>>>> As there are plenty of records which are not updated
frequently,
> > >>> this
> > >>>>>>>> would leave the topic with a surplus of quite a
few million
> > >>> duplicate
> > >>>>>>>> records (and increasing every time the above mentioned
function
> is
> > >>>>>> applied).
> > >>>>>>>>
> > >>>>>>>> I would prefer to have the temporary memory footprint
of 8 bytes
> > per
> > >>>>>>>> record whenever the compaction is run (only when
not in 'offset'
> > >>>> mode),
> > >>>>>>>> than allowing for the topic to run into this state.
> > >>>>>>>>
> > >>>>>>>> What do you think? Is this scenario too specific
for me, or do
> you
> > >>>>>> believe
> > >>>>>>>> that it could happen to other clients as well?
> > >>>>>>>>
> > >>>>>>>> Thanks again for the continued discussion!
> > >>>>>>>> Cheers,
> > >>>>>>>> Luis    On Friday, April 27, 2018, 8:21:13 PM GMT+2,
Guozhang
> > Wang <
> > >>>>>>>> wangguoz@gmail.com> wrote:
> > >>>>>>>>
> > >>>>>>>> Hello Luis,
> > >>>>>>>>
> > >>>>>>>> When the comparing the version returns `equal`,
the original
> > >>> proposal
> > >>>>>> is to
> > >>>>>>>> use the offset as the tie breaker. My previous
comment is that
> > >>>>>>>>
> > >>>>>>>> 1) when we build the map calling `put`, if there
is already an
> > entry
> > >>>> for
> > >>>>>>>> the key, compare its stored version, and replace
if the put
> > record's
> > >>>>>>>> version is "no smaller than" the stored record:
this is because
> > when
> > >>>>>>>> building the map we are always going from smaller
offsets to
> > larger
> > >>>>>> ones.
> > >>>>>>>>
> > >>>>>>>> 2) when making a second pass to determine if each
record should
> be
> > >>>>>> retained
> > >>>>>>>> based on the map, we do not try to break the tie
if the map's
> > >>> returned
> > >>>>>>>> version is the same but always treat it as "keep".
In this case
> > when
> > >>>> we
> > >>>>>> are
> > >>>>>>>> comparing a record with itself stored in the offset
map, version
> > >>>>>> comparison
> > >>>>>>>> would return `equals`. As I mentioned in the PR,
one caveat is
> > that
> > >>> we
> > >>>>>> may
> > >>>>>>>> indeed have multiple records with the same key
and the same
> > version,
> > >>>> but
> > >>>>>>>> once a new versioned record is appended it will
be deleted.
> > >>>>>>>>
> > >>>>>>>>
> > >>>>>>>> Does that make sense?
> > >>>>>>>>
> > >>>>>>>> Guozhang
> > >>>>>>>>
> > >>>>>>>>
> > >>>>>>>> On Fri, Apr 27, 2018 at 4:20 AM, Luís Cabral
> > >>>>>> <luis_cabral@yahoo.com.invalid
> > >>>>>>>>>
> > >>>>>>>> wrote:
> > >>>>>>>>
> > >>>>>>>>> Hi,
> > >>>>>>>>>
> > >>>>>>>>> I was updating the PR to match the latest decisions
and noticed
> > (or
> > >>>>>>>>> rather, the integration tests noticed) that
without storing the
> > >>>> offset,
> > >>>>>>>>> then the cache doesn't know when to keep the
record itself.
> > >>>>>>>>>
> > >>>>>>>>> This is because, after the cache is populated,
all the records
> > are
> > >>>>>>>>> compared against the stored ones, so "Record{key:A,offset:1,
> > >>>>>> version:1}"
> > >>>>>>>>> will compare against itself and be flagged
as "don't keep",
> since
> > >>> we
> > >>>>>> only
> > >>>>>>>>> compared based on the version and didn't check
to see if the
> > offset
> > >>>> was
> > >>>>>>>> the
> > >>>>>>>>> same or not.
> > >>>>>>>>>
> > >>>>>>>>> This sort of invalidates not storing the offset
in the cache,
> > >>>>>>>>> unfortunately, and the binary footprint increases
two-fold when
> > >>>>>> "offset"
> > >>>>>>>> is
> > >>>>>>>>> not used as a compaction strategy.
> > >>>>>>>>>
> > >>>>>>>>> Guozhang: Is it ok with you if we go back on
this decision and
> > >>> leave
> > >>>>>> the
> > >>>>>>>>> offset as a tie-breaker?
> > >>>>>>>>>
> > >>>>>>>>>
> > >>>>>>>>> Kind Regards,Luis
> > >>>>>>>>>
> > >>>>>>>>>  On Friday, April 27, 2018, 11:11:55 AM GMT+2,
Luís Cabral
> > >>>>>>>>> <luis_cabral@yahoo.com.INVALID> wrote:
> > >>>>>>>>>
> > >>>>>>>>> Hi,
> > >>>>>>>>>
> > >>>>>>>>> The KIP is now updated with the results of
the byte array
> > >>> discussion.
> > >>>>>>>>>
> > >>>>>>>>> This is my first contribution to Kafka, so
I'm not sure on what
> > the
> > >>>>>>>>> processes are. Is it now acceptable to take
this into a vote,
> or
> > >>>>>> should I
> > >>>>>>>>> ask for more contributors to join the discussion
first?
> > >>>>>>>>>
> > >>>>>>>>> Kind Regards,Luis    On Friday, April 27, 2018,
6:12:03 AM
> GMT+2,
> > >>>>>>>> Guozhang
> > >>>>>>>>> Wang <wangguoz@gmail.com> wrote:
> > >>>>>>>>>
> > >>>>>>>>> Hello Luís,
> > >>>>>>>>>
> > >>>>>>>>>> Offset is an integer? I've only noticed
it being resolved as a
> > >>> long
> > >>>> so
> > >>>>>>>>> far.
> > >>>>>>>>>
> > >>>>>>>>> You are right, offset is a long.
> > >>>>>>>>>
> > >>>>>>>>> As for timestamp / other types, I left a comment
in your PR
> about
> > >>>>>>>> handling
> > >>>>>>>>> tie breakers.
> > >>>>>>>>>
> > >>>>>>>>>> Given these arguments, is this point something
that you
> > absolutely
> > >>>>>> must
> > >>>>>>>>> have?
> > >>>>>>>>>
> > >>>>>>>>> No I do not have a strong use case in mind
to go with arbitrary
> > >>> byte
> > >>>>>>>>> arrays, was just thinking that if we are going
to enhance log
> > >>>>>> compaction
> > >>>>>>>>> why not generalize it more :)
> > >>>>>>>>>
> > >>>>>>>>> Your concern about the memory usage makes sense.
I'm happy to
> > take
> > >>> my
> > >>>>>>>>> suggestion back and enforce only long typed
fields.
> > >>>>>>>>>
> > >>>>>>>>>
> > >>>>>>>>> Guozhang
> > >>>>>>>>>
> > >>>>>>>>>
> > >>>>>>>>>
> > >>>>>>>>>
> > >>>>>>>>>
> > >>>>>>>>> On Thu, Apr 26, 2018 at 1:44 AM, Luís Cabral
> > >>>>>>>> <luis_cabral@yahoo.com.invalid
> > >>>>>>>>>>
> > >>>>>>>>> wrote:
> > >>>>>>>>>
> > >>>>>>>>>> Hi,
> > >>>>>>>>>>
> > >>>>>>>>>> bq. have a integer typed OffsetMap (for
offset)
> > >>>>>>>>>>
> > >>>>>>>>>> Offset is an integer? I've only noticed
it being resolved as a
> > >>> long
> > >>>> so
> > >>>>>>>>> far.
> > >>>>>>>>>>
> > >>>>>>>>>>
> > >>>>>>>>>> bq. long typed OffsetMap (for timestamp)
> > >>>>>>>>>>
> > >>>>>>>>>> We would still need to store the offset,
as it is functioning
> > as a
> > >>>>>>>>>> tie-breaker. Not that this is a big deal,
we can be easily
> have
> > >>> both
> > >>>>>>>> (as
> > >>>>>>>>>> currently done on the PR).
> > >>>>>>>>>>
> > >>>>>>>>>>
> > >>>>>>>>>> bq. For the byte array typed offset map,
we can use a general
> > >>>> hashmap,
> > >>>>>>>>>> where the hashmap's CAPACITY will be reasoned
from the given
> > "val
> > >>>>>>>> memory:
> > >>>>>>>>>> Int" parameter
> > >>>>>>>>>>
> > >>>>>>>>>> If you have a map with 128 byte capacity,
then store a value
> > with
> > >>> 16
> > >>>>>>>>> bytes
> > >>>>>>>>>> and another with 32 bytes, how many free
slots do you have
> left
> > in
> > >>>>>> this
> > >>>>>>>>> map?
> > >>>>>>>>>>
> > >>>>>>>>>> You can make this work, but I think you
would need to
> re-design
> > >>> the
> > >>>>>>>> whole
> > >>>>>>>>>> log cleaner approach, which implies changing
some of the
> already
> > >>>>>>>> existing
> > >>>>>>>>>> configurations (like "log.cleaner.io.buffer.load.factor").
I
> > >>> would
> > >>>>>>>>> rather
> > >>>>>>>>>> maintain backwards compatibility as much
as possible in this
> > KIP,
> > >>>> and
> > >>>>>>>> if
> > >>>>>>>>>> this means that using "foo" / "bar" or
"2.1-a" / "3.20-b" as
> > >>> record
> > >>>>>>>>>> versions is not viable, then so be it.
> > >>>>>>>>>>
> > >>>>>>>>>> Given these arguments, is this point something
that you
> > absolutely
> > >>>>>> must
> > >>>>>>>>>> have? I'm still sort of hoping that you
are just entertaining
> > the
> > >>>> idea
> > >>>>>>>>> and
> > >>>>>>>>>> are ok with having a long (now conceded
to be unsigned, so the
> > >>> byte
> > >>>>>>>>> arrays
> > >>>>>>>>>> can be compared directly).
> > >>>>>>>>>>
> > >>>>>>>>>>
> > >>>>>>>>>> Kind Regards,Luis
> > >>>>>>>>>>
> > >>>>>>>>>
> > >>>>>>>>>
> > >>>>>>>>>
> > >>>>>>>>> --
> > >>>>>>>>> -- Guozhang
> > >>>>>>>>>
> > >>>>>>>>
> > >>>>>>>>
> > >>>>>>>>
> > >>>>>>>> --
> > >>>>>>>> -- Guozhang
> > >>>>>>>>
> > >>>>>>>
> > >>>>>>>
> > >>>>>>>
> > >>>>>>
> > >>>>>>
> > >>>>>
> > >>>>>
> > >>>>
> > >>>>
> > >>>
> > >>>
> > >>> --
> > >>> -- Guozhang
> > >>>
> > >>>
> > >>
> > >>
> > >> --
> > >> -- Guozhang
> >
> >
>
>
> --
> -- Guozhang




-- 
-- Guozhang

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message