kafka-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Dong Lin <lindon...@gmail.com>
Subject Re: [DISCUSS] KIP-126 - Allow KafkaProducer to batch based on uncompressed size
Date Sat, 04 Mar 2017 03:43:29 GMT
Hey Becket,

I haven't looked at the patch yet. But since we are going to try the
split-on-oversize solution, should the KIP also add a sensor that shows the
rate of split per second and the probability of split?

Thanks,
Dong


On Fri, Mar 3, 2017 at 6:39 PM, Becket Qin <becket.qin@gmail.com> wrote:

> Just to clarify, the implementation is basically what I mentioned above
> (split/resend + adjusted estimation evolving algorithm) and changing the
> compression ratio estimation to be per topic.
>
> Thanks,
>
> Jiangjie (Becket) Qin
>
> On Fri, Mar 3, 2017 at 6:36 PM, Becket Qin <becket.qin@gmail.com> wrote:
>
> > I went ahead and have a patch submitted here:
> > https://github.com/apache/kafka/pull/2638
> >
> > Per Joel's suggestion, I changed the compression ratio to be per topic as
> > well. It seems working well. Since there is an important behavior change
> > and a new sensor is added, I'll keep the KIP and update it according.
> >
> > Thanks,
> >
> > Jiangjie (Becket) Qin
> >
> > On Mon, Feb 27, 2017 at 3:50 PM, Joel Koshy <jjkoshy.w@gmail.com> wrote:
> >
> >> >
> >> > Lets say we sent the batch over the wire and received a
> >> > RecordTooLargeException, how do we split it as once we add the message
> >> to
> >> > the batch we loose the message level granularity. We will have to
> >> > decompress, do deep iteration and split and again compress. right?
> This
> >> > looks like a performance bottle neck in case of multi topic producers
> >> like
> >> > mirror maker.
> >> >
> >>
> >> Yes, but these should be outliers if we do estimation on a per-topic
> basis
> >> and if we target a conservative-enough compression ratio. The producer
> >> should also avoid sending over the wire if it can be made aware of the
> >> max-message size limit on the broker, and split if it determines that a
> >> record exceeds the broker's config. Ideally this should be part of topic
> >> metadata but is not - so it could be off a periodic describe-configs
> >> <https://cwiki.apache.org/confluence/display/KAFKA/KIP-4+-+
> >> Command+line+and+centralized+administrative+operations#KIP-
> >> 4-Commandlineandcentralizedadministrativeoperations-Describe
> >> ConfigsRequest>
> >> (which isn't available yet). This doesn't remove the need to split and
> >> recompress though.
> >>
> >>
> >> > On Mon, Feb 27, 2017 at 10:51 AM, Becket Qin <becket.qin@gmail.com>
> >> wrote:
> >> >
> >> > > Hey Mayuresh,
> >> > >
> >> > > 1) The batch would be split when an RecordTooLargeException is
> >> received.
> >> > > 2) Not lower the actual compression ratio, but lower the estimated
> >> > > compression ratio "according to" the Actual Compression Ratio(ACR).
> >> > >
> >> > > An example, let's start with Estimated Compression Ratio (ECR) =
> 1.0.
> >> Say
> >> > > the compression ratio of ACR is ~0.8, instead of letting the ECR
> >> dropped
> >> > to
> >> > > 0.8 very quickly, we only drop 0.001 every time when ACR < ECR.
> >> However,
> >> > > once we see an ACR > ECR, we increment ECR by 0.05. If a
> >> > > RecordTooLargeException is received, we reset the ECR back to 1.0
> and
> >> > split
> >> > > the batch.
> >> > >
> >> > > Thanks,
> >> > >
> >> > > Jiangjie (Becket) Qin
> >> > >
> >> > >
> >> > >
> >> > > On Mon, Feb 27, 2017 at 10:30 AM, Mayuresh Gharat <
> >> > > gharatmayuresh15@gmail.com> wrote:
> >> > >
> >> > > > Hi Becket,
> >> > > >
> >> > > > Seems like an interesting idea.
> >> > > > I had couple of questions :
> >> > > > 1) How do we decide when the batch should be split?
> >> > > > 2) What do you mean by slowly lowering the "actual" compression
> >> ratio?
> >> > > > An example would really help here.
> >> > > >
> >> > > > Thanks,
> >> > > >
> >> > > > Mayuresh
> >> > > >
> >> > > > On Fri, Feb 24, 2017 at 3:17 PM, Becket Qin <becket.qin@gmail.com
> >
> >> > > wrote:
> >> > > >
> >> > > > > Hi Jay,
> >> > > > >
> >> > > > > Yeah, I got your point.
> >> > > > >
> >> > > > > I think there might be a solution which do not require adding
a
> >> new
> >> > > > > configuration. We can start from a very conservative compression
> >> > ratio
> >> > > > say
> >> > > > > 1.0 and lower it very slowly according to the actual compression
> >> > ratio
> >> > > > > until we hit a point that we have to split a batch. At that
> >> point, we
> >> > > > > exponentially back off on the compression ratio. The idea
is
> >> somewhat
> >> > > > like
> >> > > > > TCP. This should help avoid frequent split.
> >> > > > >
> >> > > > > The upper bound of the batch size is also a little awkward
today
> >> > > because
> >> > > > we
> >> > > > > say the batch size is based on compressed size, but users
cannot
> >> set
> >> > it
> >> > > > to
> >> > > > > the max message size because that will result in oversized
> >> messages.
> >> > > With
> >> > > > > this change we will be able to allow the users to set the
> message
> >> > size
> >> > > to
> >> > > > > close to max message size.
> >> > > > >
> >> > > > > However the downside is that there could be latency spikes
in
> the
> >> > > system
> >> > > > in
> >> > > > > this case due to the splitting, especially when there are
many
> >> > messages
> >> > > > > need to be split at the same time. That could potentially
be an
> >> issue
> >> > > for
> >> > > > > some users.
> >> > > > >
> >> > > > > What do you think about this approach?
> >> > > > >
> >> > > > > Thanks,
> >> > > > >
> >> > > > > Jiangjie (Becket) Qin
> >> > > > >
> >> > > > >
> >> > > > >
> >> > > > > On Thu, Feb 23, 2017 at 1:31 PM, Jay Kreps <jay@confluent.io>
> >> wrote:
> >> > > > >
> >> > > > > > Hey Becket,
> >> > > > > >
> >> > > > > > Yeah that makes sense.
> >> > > > > >
> >> > > > > > I agree that you'd really have to both fix the estimation
> (i.e.
> >> > make
> >> > > it
> >> > > > > per
> >> > > > > > topic or make it better estimate the high percentiles)
AND
> have
> >> the
> >> > > > > > recovery mechanism. If you are underestimating often
and then
> >> > paying
> >> > > a
> >> > > > > high
> >> > > > > > recovery price that won't fly.
> >> > > > > >
> >> > > > > > I think you take my main point though, which is just
that I
> >> hate to
> >> > > > > exposes
> >> > > > > > these super low level options to users because it is
so hard
> to
> >> > > explain
> >> > > > > to
> >> > > > > > people what it means and how they should set it. So
if it is
> >> > possible
> >> > > > to
> >> > > > > > make either some combination of better estimation and
> splitting
> >> or
> >> > > > better
> >> > > > > > tolerance of overage that would be preferrable.
> >> > > > > >
> >> > > > > > -Jay
> >> > > > > >
> >> > > > > > On Thu, Feb 23, 2017 at 11:51 AM, Becket Qin <
> >> becket.qin@gmail.com
> >> > >
> >> > > > > wrote:
> >> > > > > >
> >> > > > > > > @Dong,
> >> > > > > > >
> >> > > > > > > Thanks for the comments. The default behavior
of the
> producer
> >> > won't
> >> > > > > > change.
> >> > > > > > > If the users want to use the uncompressed message
size, they
> >> > > probably
> >> > > > > > will
> >> > > > > > > also bump up the batch size to somewhere close
to the max
> >> message
> >> > > > size.
> >> > > > > > > This would be in the document. BTW the default
batch size is
> >> 16K
> >> > > > which
> >> > > > > is
> >> > > > > > > pretty small.
> >> > > > > > >
> >> > > > > > > @Jay,
> >> > > > > > >
> >> > > > > > > Yeah, we actually had debated quite a bit internally
what is
> >> the
> >> > > best
> >> > > > > > > solution to this.
> >> > > > > > >
> >> > > > > > > I completely agree it is a bug. In practice we
usually leave
> >> some
> >> > > > > > headroom
> >> > > > > > > to allow the compressed size to grow a little
if the the
> >> original
> >> > > > > > messages
> >> > > > > > > are not compressible, for example, 1000 KB instead
of
> exactly
> >> 1
> >> > MB.
> >> > > > It
> >> > > > > is
> >> > > > > > > likely safe enough.
> >> > > > > > >
> >> > > > > > > The major concern for the rejected alternative
is
> >> performance. It
> >> > > > > largely
> >> > > > > > > depends on how frequent we need to split a batch,
i.e. how
> >> likely
> >> > > the
> >> > > > > > > estimation can go off. If we only need to the
split work
> >> > > > occasionally,
> >> > > > > > the
> >> > > > > > > cost would be amortized so we don't need to worry
about it
> too
> >> > > much.
> >> > > > > > > However, it looks that for a producer with shared
topics,
> the
> >> > > > > estimation
> >> > > > > > is
> >> > > > > > > always off. As an example, consider two topics,
one with
> >> > > compression
> >> > > > > > ratio
> >> > > > > > > 0.6 the other 0.2, assuming exactly same traffic,
the
> average
> >> > > > > compression
> >> > > > > > > ratio would be roughly 0.4, which is not right
for either of
> >> the
> >> > > > > topics.
> >> > > > > > So
> >> > > > > > > almost half of the batches (of the topics with
0.6
> compression
> >> > > ratio)
> >> > > > > > will
> >> > > > > > > end up larger than the configured batch size.
When it comes
> to
> >> > more
> >> > > > > > topics
> >> > > > > > > such as mirror maker, this becomes more unpredictable.
To
> >> avoid
> >> > > > > frequent
> >> > > > > > > rejection / split of the batches, we need to configured
the
> >> batch
> >> > > > size
> >> > > > > > > pretty conservatively. This could actually hurt
the
> >> performance
> >> > > > because
> >> > > > > > we
> >> > > > > > > are shoehorn the messages that are highly compressible
to a
> >> small
> >> > > > batch
> >> > > > > > so
> >> > > > > > > that the other topics that are not that compressible
will
> not
> >> > > become
> >> > > > > too
> >> > > > > > > large with the same batch size. At LinkedIn, our
batch size
> is
> >> > > > > configured
> >> > > > > > > to 64 KB because of this. I think we may actually
have
> better
> >> > > > batching
> >> > > > > if
> >> > > > > > > we just use the uncompressed message size and
800 KB batch
> >> size.
> >> > > > > > >
> >> > > > > > > We did not think about loosening the message size
> restriction,
> >> > but
> >> > > > that
> >> > > > > > > sounds a viable solution given that the consumer
now can
> fetch
> >> > > > > oversized
> >> > > > > > > messages. One concern would be that on the broker
side
> >> oversized
> >> > > > > messages
> >> > > > > > > will bring more memory pressure. With KIP-92,
we may
> mitigate
> >> > that,
> >> > > > but
> >> > > > > > the
> >> > > > > > > memory allocation for large messages may not be
very GC
> >> > friendly. I
> >> > > > > need
> >> > > > > > to
> >> > > > > > > think about this a little more.
> >> > > > > > >
> >> > > > > > > Thanks,
> >> > > > > > >
> >> > > > > > > Jiangjie (Becket) Qin
> >> > > > > > >
> >> > > > > > >
> >> > > > > > > On Wed, Feb 22, 2017 at 8:57 PM, Jay Kreps <
> jay@confluent.io>
> >> > > wrote:
> >> > > > > > >
> >> > > > > > > > Hey Becket,
> >> > > > > > > >
> >> > > > > > > > I get the problem we want to solve with this,
but I don't
> >> think
> >> > > > this
> >> > > > > is
> >> > > > > > > > something that makes sense as a user controlled
knob that
> >> > > everyone
> >> > > > > > > sending
> >> > > > > > > > data to kafka has to think about. It is basically
a bug,
> >> right?
> >> > > > > > > >
> >> > > > > > > > First, as a technical question is it true
that using the
> >> > > > uncompressed
> >> > > > > > > size
> >> > > > > > > > for batching actually guarantees that you
observe the
> >> limit? I
> >> > > > think
> >> > > > > > that
> >> > > > > > > > implies that compression always makes the
messages
> smaller,
> >> > > which i
> >> > > > > > think
> >> > > > > > > > usually true but is not guaranteed, right?
e.g. if someone
> >> > > encrypts
> >> > > > > > their
> >> > > > > > > > data which tends to randomize it and then
enables
> >> > compressesion,
> >> > > it
> >> > > > > > could
> >> > > > > > > > slightly get bigger?
> >> > > > > > > >
> >> > > > > > > > I also wonder if the rejected alternatives
you describe
> >> > couldn't
> >> > > be
> >> > > > > > made
> >> > > > > > > to
> >> > > > > > > > work: basically try to be a bit better at
estimation and
> >> > recover
> >> > > > when
> >> > > > > > we
> >> > > > > > > > guess wrong. I don't think the memory usage
should be a
> >> > problem:
> >> > > > > isn't
> >> > > > > > it
> >> > > > > > > > the same memory usage the consumer of that
topic would
> need?
> >> > And
> >> > > > > can't
> >> > > > > > > you
> >> > > > > > > > do the splitting and recompression in a streaming
fashion?
> >> If
> >> > we
> >> > > an
> >> > > > > > make
> >> > > > > > > > the estimation rate low and the recovery
cost is just ~2x
> >> the
> >> > > > normal
> >> > > > > > cost
> >> > > > > > > > for that batch that should be totally fine,
right? (It's
> >> > > > technically
> >> > > > > > true
> >> > > > > > > > you might have to split more than once, but
since you
> halve
> >> it
> >> > > each
> >> > > > > > time
> >> > > > > > > I
> >> > > > > > > > think should you get a number of halvings
that is
> >> logarithmic
> >> > in
> >> > > > the
> >> > > > > > miss
> >> > > > > > > > size, which, with better estimation you'd
hope would be
> >> super
> >> > > duper
> >> > > > > > > small).
> >> > > > > > > >
> >> > > > > > > > Alternatively maybe we could work on the
other side of the
> >> > > problem
> >> > > > > and
> >> > > > > > > try
> >> > > > > > > > to make it so that a small miss on message
size isn't a
> big
> >> > > > problem.
> >> > > > > I
> >> > > > > > > > think original issue was that max size and
fetch size were
> >> > > tightly
> >> > > > > > > coupled
> >> > > > > > > > and the way memory in the consumer worked
you really
> wanted
> >> > fetch
> >> > > > > size
> >> > > > > > to
> >> > > > > > > > be as small as possible because you'd use
that much memory
> >> per
> >> > > > > fetched
> >> > > > > > > > partition and the consumer would get stuck
if its fetch
> size
> >> > > wasn't
> >> > > > > big
> >> > > > > > > > enough. I think we made some progress on
that issue and
> >> maybe
> >> > > more
> >> > > > > > could
> >> > > > > > > be
> >> > > > > > > > done there so that a small bit of fuzziness
around the
> size
> >> > would
> >> > > > not
> >> > > > > > be
> >> > > > > > > an
> >> > > > > > > > issue?
> >> > > > > > > >
> >> > > > > > > > -Jay
> >> > > > > > > >
> >> > > > > > > >
> >> > > > > > > >
> >> > > > > > > > On Tue, Feb 21, 2017 at 12:30 PM, Becket
Qin <
> >> > > becket.qin@gmail.com
> >> > > > >
> >> > > > > > > wrote:
> >> > > > > > > >
> >> > > > > > > > > Hi folks,
> >> > > > > > > > >
> >> > > > > > > > > I would like to start the discussion
thread on KIP-126.
> >> The
> >> > KIP
> >> > > > > > propose
> >> > > > > > > > > adding a new configuration to KafkaProducer
to allow
> >> batching
> >> > > > based
> >> > > > > > on
> >> > > > > > > > > uncompressed message size.
> >> > > > > > > > >
> >> > > > > > > > > Comments are welcome.
> >> > > > > > > > >
> >> > > > > > > > > The KIP wiki is following:
> >> > > > > > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> >> > > > > > > > > 126+-+Allow+KafkaProducer+to+b
> >> atch+based+on+uncompressed+siz
> >> > e
> >> > > > > > > > >
> >> > > > > > > > > Thanks,
> >> > > > > > > > >
> >> > > > > > > > > Jiangjie (Becket) Qin
> >> > > > > > > > >
> >> > > > > > > >
> >> > > > > > >
> >> > > > > >
> >> > > > >
> >> > > >
> >> > > >
> >> > > >
> >> > > > --
> >> > > > -Regards,
> >> > > > Mayuresh R. Gharat
> >> > > > (862) 250-7125
> >> > > >
> >> > >
> >> >
> >> >
> >> >
> >> > --
> >> > -Regards,
> >> > Mayuresh R. Gharat
> >> > (862) 250-7125
> >> >
> >>
> >
> >
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message