kafka-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Jay Kreps <...@confluent.io>
Subject Re: [DISCUSS] KIP-81: Max in-flight fetches
Date Tue, 13 Dec 2016 06:21:13 GMT
I think the question is whether we have a truly optimal strategy for
deriving the partition- and fetch-level configs from the global setting. If
we do then we should just get rid of them. If not, then if we can at least
derive usually good and never terrible settings from the global limit at
initialization time maybe we can set them automatically unless the user
overrides with an explicit conifg. Even the latter would let us mark it low
priority which at least takes it off the list of things you have to grok to
use the consumer which I suspect would be much appreciated by our poor
users.

Regardless it'd be nice to make sure we get an explanation of the
relationships between the remaining memory configs in the KIP and in the
docs.

I agree that buffer.memory isn't bad.

-Jay


On Mon, Dec 12, 2016 at 2:56 PM, Jason Gustafson <jason@confluent.io> wrote:

> Yeah, that's a good point. Perhaps in retrospect, it would have been better
> to define `buffer.memory` first and let `fetch.max.bytes` be based off of
> it. I like `buffer.memory` since it gives the consumer nice symmetry with
> the producer and its generic naming gives us some flexibility internally
> with how we use it. We could still do that I guess, if we're willing to
> deprecate `fetch.max.bytes` (one release after adding it!).
>
> As for `max.partition.fetch.bytes`, it's noted in KIP-74 that it is still
> useful in Kafka Streams, but I agree it makes sense to lower its priority
> in favor of `fetch.max.bytes`.
>
> -Jason
>
> On Sat, Dec 10, 2016 at 2:27 PM, Jay Kreps <jay@confluent.io> wrote:
>
> > Jason, it's not just decompression but also the conversion from packed
> > bytes to java objects, right? That can be even larger than the
> > decompression blow up. I think this may be okay, the problem may just be
> > that the naming is a bit misleading. In the producer you are literally
> > allocating a buffer of that size, so the name buffer.memory makes sense.
> In
> > this case it is something more like max.bytes.read.per.poll.call
> (terrible
> > name, but maybe something like that?).
> >
> > Mickael, I'd second Jason's request for the default and expand on it. We
> > currently have several consumer-related memory
> > settings--max.partition.fetch.bytes, fetch.max.bytes. I don't think it
> is
> > clear today how to set these. For example we mark
> max.partition.fetch.bytes
> > as high importance and fetch.max.bytes as medium, but it seems like it
> > would be the other way around. Can we think this through from the point
> of
> > view of a lazy user? I.e. I have 64MB of space to use for my consumer, in
> > an ideal world I'd say, "hey consumer here is 64MB go use that as
> > efficiently as possible" and not have to tune a bunch of individual
> things
> > with complex relationships. Maybe one or both of the existing settings
> can
> > either be eliminated or at the least marked as low priority and we can
> > infer a reasonable default from the new config your introducing?
> >
> > -jay
> >
> > On Fri, Dec 9, 2016 at 2:08 PM, Jason Gustafson <jason@confluent.io>
> > wrote:
> >
> > > Hi Mickael,
> > >
> > > I think the approach looks good, just a few minor questions:
> > >
> > > 1. The KIP doesn't say what the default value of `buffer.memory` will
> be.
> > > Looks like we use 50MB as the default for `fetch.max.bytes`, so perhaps
> > it
> > > makes sense to set the default based on that. Might also be worth
> > > mentioning somewhere the constraint between the two configs.
> > > 2. To clarify, this limit only affects the uncompressed size of the
> > fetched
> > > data, right? The consumer may still exceed it in order to store the
> > > decompressed record data. We delay decompression until the records are
> > > returned to the user, but because of max.poll.records, we may end up
> > > holding onto the decompressed data from a single partition for a few
> > > iterations. I think this is fine, but probably worth noting in the KIP.
> > > 3. Is there any risk using the MemoryPool that, after we fill up the
> > memory
> > > with fetch data, we can starve the coordinator's connection? Suppose,
> for
> > > example, that we send a bunch of pre-fetches right before returning to
> > the
> > > user. These fetches might return before the next call to poll(), in
> which
> > > case we might not have enough memory to receive heartbeats, which would
> > > block us from sending additional heartbeats until the next call to
> > poll().
> > > Not sure it's a big problem since heartbeats are tiny, but might be
> worth
> > > thinking about.
> > >
> > > Thanks,
> > > Jason
> > >
> > >
> > > On Fri, Dec 2, 2016 at 4:31 AM, Mickael Maison <
> mickael.maison@gmail.com
> > >
> > > wrote:
> > >
> > > > It's been a few days since the last comments. KIP-72 vote seems to
> > > > have passed so if I don't get any new comments I'll start the vote on
> > > > Monday.
> > > > Thanks
> > > >
> > > > On Mon, Nov 14, 2016 at 6:25 PM, radai <radai.rosenblatt@gmail.com>
> > > wrote:
> > > > > +1 - there's is a need for an effective way to control kafka memory
> > > > > consumption - both on the broker and on clients.
> > > > > i think we could even reuse the exact same param name -
> > > > *queued.max.bytes *-
> > > > > as it would serve the exact same purpose.
> > > > >
> > > > > also (and again its the same across the broker and clients) this
> > bound
> > > > > should also cover decompression, at some point.
> > > > > the problem with that is that to the best of my knowledge the
> current
> > > > wire
> > > > > protocol does not declare the final, uncompressed size of anything
> up
> > > > front
> > > > > - all we know is the size of the compressed buffer. this may
> require
> > a
> > > > > format change in the future to properly support?
> > > > >
> > > > > On Mon, Nov 14, 2016 at 10:03 AM, Mickael Maison <
> > > > mickael.maison@gmail.com>
> > > > > wrote:
> > > > >
> > > > >> Thanks for all the replies.
> > > > >>
> > > > >> I've updated the KIP:
> > > > >> https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> > > > >> 81%3A+Bound+Fetch+memory+usage+in+the+consumer
> > > > >> The main point is to selectively read from sockets instead of
> > > > >> throttling FetchRequests sends. I also mentioned it will be
> reusing
> > > > >> the MemoryPool implementation created in KIP-72 instead of adding
> > > > >> another memory tracking method.
> > > > >>
> > > > >> Please have another look. As always, comments are welcome !
> > > > >>
> > > > >> On Thu, Nov 10, 2016 at 2:47 AM, radai <
> radai.rosenblatt@gmail.com>
> > > > wrote:
> > > > >> > selectively reading from sockets achieves memory control
(up to
> > and
> > > > not
> > > > >> > including talk of (de)compression)
> > > > >> >
> > > > >> > this is exactly what i (also, even mostly) did for kip-72
-
> which
> > i
> > > > hope
> > > > >> in
> > > > >> > itself should be a reason to think about both KIPs at the
same
> > time
> > > > >> because
> > > > >> > the changes will be similar (at least in intent) and might
> result
> > in
> > > > >> > duplicated effort.
> > > > >> >
> > > > >> > a pool API is a way to "scale" all the way from just
> maintaining a
> > > > >> variable
> > > > >> > holding amount of available memory (which is what my current
> > kip-72
> > > > code
> > > > >> > does and what this kip proposes IIUC) all the way up to
actually
> > > > re-using
> > > > >> > buffers without any changes to the code using the pool -
just
> drop
> > > in
> > > > a
> > > > >> > different pool impl.
> > > > >> >
> > > > >> > for "edge nodes" (producer/consumer) the performance gain
in
> > > actually
> > > > >> > pooling large buffers may be arguable, but i suspect for
brokers
> > > > >> regularly
> > > > >> > operating on 1MB-sized requests (which is the norm at linkedin)
> > the
> > > > >> > resulting memory fragmentation is an actual bottleneck (i
have
> > > initial
> > > > >> > micro-benchmark results to back this up but have not had
the
> time
> > to
> > > > do a
> > > > >> > full profiling run).
> > > > >> >
> > > > >> > so basically I'm saying we may be doing (very) similar things
in
> > > > mostly
> > > > >> the
> > > > >> > same areas of code.
> > > > >> >
> > > > >> > On Wed, Nov 2, 2016 at 11:35 AM, Mickael Maison <
> > > > >> mickael.maison@gmail.com>
> > > > >> > wrote:
> > > > >> >
> > > > >> >> electively reading from the socket should enable to
> > > > >> >> control the memory usage without impacting performance.
I've
> had
> > > look
> > > > >> >> at that today and I can see how that would work.
> > > > >> >> I'll update the KIP accordingly tomorrow.
> > > > >> >>
> > > > >>
> > > >
> > >
> >
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message