kafka-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Guozhang Wang <wangg...@gmail.com>
Subject Re: KIP-41: KafkaConsumer Max Records
Date Fri, 08 Jan 2016 02:31:55 GMT
I think it is a general issue to bound the memory footprint on the Java
consumer, no matter whether we do the prefetching in this KIP as even today
we do not have anyway to manage memory usage on the consumer.

Today on the producer side we bound the memory usage, and we may need to do
the same on the consumer. Here are some discussions about this:

https://issues.apache.org/jira/browse/KAFKA-2045

Guozhang


On Thu, Jan 7, 2016 at 5:21 PM, Aarti Gupta <aartiguptaa@gmail.com> wrote:

> @Jason,
> (apologies, got your name wrong the first time round)
>
> On Thu, Jan 7, 2016 at 5:15 PM, Aarti Gupta <aartiguptaa@gmail.com> wrote:
>
> > Hi Json,
> >
> > I am concerned about how many records can be prefetched into consumer
> > memory.
> > Currently we control the maximum number of bytes per topic and partition
> > by setting fetch.message.max.bytes
> >
> > The max.partition.fetch.bytes = #no of partitions *
> > fetch.message.max.bytes
> > However, partitions can be added dynamically, which would mean that a
> > single process (for example a single JVM with multiple consumers), that
> > consumes messages from large number of partitions may not able to keep
> all
> > the pre fetched messages in memory.
> >
> > Additionally, if the relative size of messages is highly variable, it
> > would be hard to correlate the max size in bytes for message fetch with
> the
> > number of records returned on a poll.
> > We previously observed (in a production setup), that, if the size of the
> > message is greater than fetch.message.max.bytes, the consumer gets stuck.
> > This encouraged us to increase the fetch.message.max.bytes to a
> > significantly large value. This would worsen the memory consumption fear
> > described above,( when the number of partitions is also large.)
> >
> > While there may not be a single magic formula to predict the correct
> > combination of fetch.message.max.bytes and #*max.poll.records, **maybe we
> > can make the prefetch algorithm a mathematical function of the
> f*etch.message.max.bytes
> > and #noofpartitions?
> >
> > thoughts?
> > Thanks
> > aarti
> >
> > additional unimportant note: the link to the JIRA in the KIP is broken
> >
> > On Thu, Jan 7, 2016 at 2:37 PM, Guozhang Wang <wangguoz@gmail.com>
> wrote:
> >
> >> Thanks Jason. I think it is a good feature to add, +1.
> >>
> >> As suggested in KIP-32, we'd better to keep end state of the KIP wiki
> with
> >> finalized implementation details rather than leaving a list of options.
> I
> >> agree that for both fairness and pre-fetching the simpler approach would
> >> be
> >> sufficient for most of the time. So could we move the other approach to
> >> "rejected"?
> >>
> >> Guozhang
> >>
> >> On Wed, Jan 6, 2016 at 6:14 PM, Gwen Shapira <gwen@confluent.io> wrote:
> >>
> >> > I like the fair-consumption approach you chose - "pull as many records
> >> as
> >> > possible from each partition in a similar round-robin fashion", it is
> >> very
> >> > intuitive and close enough to fair.
> >> >
> >> > Overall, I'm +1 on the KIP. But you'll need a formal vote :)
> >> >
> >> > On Wed, Jan 6, 2016 at 6:05 PM, Jason Gustafson <jason@confluent.io>
> >> > wrote:
> >> >
> >> > > Thanks for the suggestion, Ismael. I updated the KIP.
> >> > >
> >> > > -Jason
> >> > >
> >> > > On Wed, Jan 6, 2016 at 6:57 AM, Ismael Juma <ismael@juma.me.uk>
> >> wrote:
> >> > >
> >> > > > Thanks Jason. I read the KIP and it makes sense to me. A minor
> >> > > suggestion:
> >> > > > in the "Ensuring Fair Consumption" section, there are 3 paragraphs
> >> > with 2
> >> > > > examples (2 partitions with 100 max.poll.records and 3 partitions
> >> with
> >> > 30
> >> > > > max.poll.records). I think you could simplify this by using one
of
> >> the
> >> > > > examples in the 3 paragraphs.
> >> > > >
> >> > > > Ismael
> >> > > >
> >> > > > On Tue, Jan 5, 2016 at 7:32 PM, Jason Gustafson <
> jason@confluent.io
> >> >
> >> > > > wrote:
> >> > > >
> >> > > > > I've updated the KIP with some implementation details. I
also
> >> added
> >> > > more
> >> > > > > discussion on the heartbeat() alternative. The short answer
for
> >> why
> >> > we
> >> > > > > rejected this API is that it doesn't seem to work well with
> offset
> >> > > > commits.
> >> > > > > This would tend to make correct usage complicated and difficult
> to
> >> > > > explain.
> >> > > > > Additionally, we don't see any clear advantages over having
a
> way
> >> to
> >> > > set
> >> > > > > the max records. For example, using max.records=1 would
be
> >> equivalent
> >> > > to
> >> > > > > invoking heartbeat() on each iteration of the message processing
> >> > loop.
> >> > > > >
> >> > > > > Going back to the discussion on whether we should use a
> >> configuration
> >> > > > value
> >> > > > > or overload poll(), I'm leaning toward the configuration
option
> >> > mainly
> >> > > > for
> >> > > > > compatibility and to keep the KafkaConsumer API from getting
any
> >> more
> >> > > > > complex. Also, as others have mentioned, it seems reasonable
to
> >> want
> >> > to
> >> > > > > tune this setting in the same place that the session timeout
and
> >> > > > heartbeat
> >> > > > > interval are configured. I still feel a little uncomfortable
> with
> >> the
> >> > > > need
> >> > > > > to do a lot of configuration tuning to get the consumer
working
> >> for a
> >> > > > > particular environment, but hopefully the defaults are
> >> conservative
> >> > > > enough
> >> > > > > that most users won't need to. However, if it remains a
problem,
> >> then
> >> > > we
> >> > > > > could still look into better options for managing the size
of
> >> batches
> >> > > > > including overloading poll() with a max records argument
or
> >> possibly
> >> > by
> >> > > > > implementing a batch scaling algorithm internally.
> >> > > > >
> >> > > > > -Jason
> >> > > > >
> >> > > > >
> >> > > > > On Mon, Jan 4, 2016 at 12:18 PM, Jason Gustafson <
> >> jason@confluent.io
> >> > >
> >> > > > > wrote:
> >> > > > >
> >> > > > > > Hi Cliff,
> >> > > > > >
> >> > > > > > I think we're all agreed that the current contract
of poll()
> >> should
> >> > > be
> >> > > > > > kept. The consumer wouldn't wait for max messages to
become
> >> > available
> >> > > > in
> >> > > > > > this proposal; it would only sure that it never returns
more
> >> than
> >> > max
> >> > > > > > messages.
> >> > > > > >
> >> > > > > > -Jason
> >> > > > > >
> >> > > > > > On Mon, Jan 4, 2016 at 11:52 AM, Cliff Rhyne <
> crhyne@signal.co>
> >> > > wrote:
> >> > > > > >
> >> > > > > >> Instead of a heartbeat, I'd prefer poll() to return
whatever
> >> > > messages
> >> > > > > the
> >> > > > > >> client has.  Either a) I don't care if I get less
than my max
> >> > > message
> >> > > > > >> limit
> >> > > > > >> or b) I do care and will set a larger timeout.
 Case B is
> less
> >> > > common
> >> > > > > than
> >> > > > > >> A and is fairly easy to handle in the application's
code.
> >> > > > > >>
> >> > > > > >> On Mon, Jan 4, 2016 at 1:47 PM, Gwen Shapira <
> >> gwen@confluent.io>
> >> > > > wrote:
> >> > > > > >>
> >> > > > > >> > 1. Agree that TCP window style scaling will
be cool. I'll
> >> try to
> >> > > > think
> >> > > > > >> of a
> >> > > > > >> > good excuse to use it ;)
> >> > > > > >> >
> >> > > > > >> > 2. I'm very concerned about the challenges
of getting the
> >> > > timeouts,
> >> > > > > >> > hearbeats and max messages right.
> >> > > > > >> >
> >> > > > > >> > Another option could be to expose "heartbeat"
API to
> >> consumers.
> >> > If
> >> > > > my
> >> > > > > >> app
> >> > > > > >> > is still processing data but is still alive,
it could
> >> initiate a
> >> > > > > >> heartbeat
> >> > > > > >> > to signal its alive without having to handle
additional
> >> > messages.
> >> > > > > >> >
> >> > > > > >> > I don't know if this improves more than it
complicates
> >> though :(
> >> > > > > >> >
> >> > > > > >> > On Mon, Jan 4, 2016 at 11:40 AM, Jason Gustafson
<
> >> > > > jason@confluent.io>
> >> > > > > >> > wrote:
> >> > > > > >> >
> >> > > > > >> > > Hey Gwen,
> >> > > > > >> > >
> >> > > > > >> > > I was thinking along the lines of TCP
window scaling in
> >> order
> >> > to
> >> > > > > >> > > dynamically find a good consumption rate.
Basically you'd
> >> > start
> >> > > > off
> >> > > > > >> > > consuming say 100 records and you'd let
it increase until
> >> the
> >> > > > > >> consumption
> >> > > > > >> > > took longer than half the session timeout
(for example).
> >> You
> >> > > > /might/
> >> > > > > >> be
> >> > > > > >> > > able to achieve the same thing using
pause/resume, but it
> >> > would
> >> > > > be a
> >> > > > > >> lot
> >> > > > > >> > > trickier since you have to do it at the
granularity of
> >> > > partitions.
> >> > > > > But
> >> > > > > >> > > yeah, database write performance doesn't
always scale in
> a
> >> > > > > predictable
> >> > > > > >> > > enough way to accommodate this, so I'm
not sure how
> useful
> >> it
> >> > > > would
> >> > > > > >> be in
> >> > > > > >> > > practice. It might also be more difficult
to implement
> >> since
> >> > it
> >> > > > > >> wouldn't
> >> > > > > >> > be
> >> > > > > >> > > as clear when to initiate the next fetch.
With a static
> >> > setting,
> >> > > > the
> >> > > > > >> > > consumer knows exactly how many records
will be returned
> on
> >> > the
> >> > > > next
> >> > > > > >> call
> >> > > > > >> > > to poll() and can send fetches accordingly.
> >> > > > > >> > >
> >> > > > > >> > > On the other hand, I do feel a little
wary of the need to
> >> tune
> >> > > the
> >> > > > > >> > session
> >> > > > > >> > > timeout and max messages though since
these settings
> might
> >> > > depend
> >> > > > on
> >> > > > > >> the
> >> > > > > >> > > environment that the consumer is deployed
in. It wouldn't
> >> be a
> >> > > big
> >> > > > > >> deal
> >> > > > > >> > if
> >> > > > > >> > > the impact was relatively minor, but
getting them wrong
> can
> >> > > cause
> >> > > > a
> >> > > > > >> lot
> >> > > > > >> > of
> >> > > > > >> > > rebalance churn which could keep the
consumer from making
> >> any
> >> > > > > >> progress.
> >> > > > > >> > > It's not a particularly graceful failure.
> >> > > > > >> > >
> >> > > > > >> > > -Jason
> >> > > > > >> > >
> >> > > > > >> > > On Mon, Jan 4, 2016 at 10:49 AM, Gwen
Shapira <
> >> > > gwen@confluent.io>
> >> > > > > >> wrote:
> >> > > > > >> > >
> >> > > > > >> > > > I can't speak to all use-cases,
but for the database
> >> one, I
> >> > > > think
> >> > > > > >> > > > pause-resume will be necessary in
any case, and
> therefore
> >> > > > dynamic
> >> > > > > >> batch
> >> > > > > >> > > > sizes are not needed.
> >> > > > > >> > > >
> >> > > > > >> > > > Databases are really unexpected
regarding response
> times
> >> -
> >> > > load
> >> > > > > and
> >> > > > > >> > > locking
> >> > > > > >> > > > can affect this. I'm not sure there's
a good way to
> know
> >> you
> >> > > are
> >> > > > > >> going
> >> > > > > >> > > into
> >> > > > > >> > > > rebalance hell before it is too
late. So if I were
> >> writing
> >> > > code
> >> > > > > that
> >> > > > > >> > > > updates an RDBMS based on Kafka,
I'd pick a reasonable
> >> batch
> >> > > > size
> >> > > > > >> (say
> >> > > > > >> > > 5000
> >> > > > > >> > > > records), and basically pause, batch-insert
all
> records,
> >> > > commit
> >> > > > > and
> >> > > > > >> > > resume.
> >> > > > > >> > > >
> >> > > > > >> > > > Does that make sense?
> >> > > > > >> > > >
> >> > > > > >> > > > On Mon, Jan 4, 2016 at 10:37 AM,
Jason Gustafson <
> >> > > > > >> jason@confluent.io>
> >> > > > > >> > > > wrote:
> >> > > > > >> > > >
> >> > > > > >> > > > > Gwen and Ismael,
> >> > > > > >> > > > >
> >> > > > > >> > > > > I agree the configuration option
is probably the way
> to
> >> > go,
> >> > > > but
> >> > > > > I
> >> > > > > >> was
> >> > > > > >> > > > > wondering whether there would
be cases where it made
> >> sense
> >> > > to
> >> > > > > let
> >> > > > > >> the
> >> > > > > >> > > > > consumer dynamically set max
messages to adjust for
> >> > > downstream
> >> > > > > >> > > slowness.
> >> > > > > >> > > > > For example, if the consumer
is writing consumed
> >> records
> >> > to
> >> > > > > >> another
> >> > > > > >> > > > > database, and that database
is experiencing heavier
> >> than
> >> > > > > expected
> >> > > > > >> > load,
> >> > > > > >> > > > > then the consumer could halve
its current max
> messages
> >> in
> >> > > > order
> >> > > > > to
> >> > > > > >> > > adapt
> >> > > > > >> > > > > without risking rebalance hell.
It could then
> increase
> >> max
> >> > > > > >> messages
> >> > > > > >> > as
> >> > > > > >> > > > the
> >> > > > > >> > > > > load on the database decreases.
It's basically an
> >> easier
> >> > way
> >> > > > to
> >> > > > > >> > handle
> >> > > > > >> > > > flow
> >> > > > > >> > > > > control than we provide with
pause/resume.
> >> > > > > >> > > > >
> >> > > > > >> > > > > -Jason
> >> > > > > >> > > > >
> >> > > > > >> > > > > On Mon, Jan 4, 2016 at 9:46
AM, Gwen Shapira <
> >> > > > gwen@confluent.io
> >> > > > > >
> >> > > > > >> > > wrote:
> >> > > > > >> > > > >
> >> > > > > >> > > > > > The wiki you pointed to
is no longer maintained and
> >> fell
> >> > > out
> >> > > > > of
> >> > > > > >> > sync
> >> > > > > >> > > > with
> >> > > > > >> > > > > > the code and protocol.
> >> > > > > >> > > > > >
> >> > > > > >> > > > > > You may want  to refer
to:
> >> > > > > >> > > > > >
> >> > > > > >> > > > > >
> >> > > > > >> > > > >
> >> > > > > >> > > >
> >> > > > > >> > >
> >> > > > > >> >
> >> > > > > >>
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol
> >> > > > > >> > > > > >
> >> > > > > >> > > > > > On Mon, Jan 4, 2016 at
4:38 AM, Jens Rantil <
> >> > > > > >> jens.rantil@tink.se>
> >> > > > > >> > > > wrote:
> >> > > > > >> > > > > >
> >> > > > > >> > > > > > > Hi guys,
> >> > > > > >> > > > > > >
> >> > > > > >> > > > > > > I realized I never
thanked yall for your input -
> >> > thanks!
> >> > > > > >> > > > > > > Jason: I apologize
for assuming your stance on
> the
> >> > > issue!
> >> > > > > >> Feels
> >> > > > > >> > > like
> >> > > > > >> > > > we
> >> > > > > >> > > > > > all
> >> > > > > >> > > > > > > agreed on the solution.
+1
> >> > > > > >> > > > > > >
> >> > > > > >> > > > > > > Follow-up: Jason
made a point about defining
> >> prefetch
> >> > > and
> >> > > > > >> > fairness
> >> > > > > >> > > > > > > behaviour in the
KIP. I am now working on putting
> >> that
> >> > > > down
> >> > > > > in
> >> > > > > >> > > > writing.
> >> > > > > >> > > > > > To
> >> > > > > >> > > > > > > do be able to do
this I think I need to
> understand
> >> the
> >> > > > > current
> >> > > > > >> > > > prefetch
> >> > > > > >> > > > > > > behaviour in the
new consumer API (0.9) a bit
> >> better.
> >> > > Some
> >> > > > > >> > specific
> >> > > > > >> > > > > > > questions:
> >> > > > > >> > > > > > >
> >> > > > > >> > > > > > >    - How does a specific
consumer balance
> incoming
> >> > > > messages
> >> > > > > >> from
> >> > > > > >> > > > > multiple
> >> > > > > >> > > > > > >    partitions? Is
the consumer simply issuing
> >> > > Multi-Fetch
> >> > > > > >> > > requests[1]
> >> > > > > >> > > > > for
> >> > > > > >> > > > > > > the
> >> > > > > >> > > > > > >    consumed assigned
partitions of the relevant
> >> > topics?
> >> > > Or
> >> > > > > is
> >> > > > > >> the
> >> > > > > >> > > > > > consumer
> >> > > > > >> > > > > > >    fetching from
one partition at a time and
> >> balancing
> >> > > > > between
> >> > > > > >> > them
> >> > > > > >> > > > > > >    internally? That
is, is the responsibility of
> >> > > partition
> >> > > > > >> > > balancing
> >> > > > > >> > > > > (and
> >> > > > > >> > > > > > >    fairness) on the
broker side or consumer side?
> >> > > > > >> > > > > > >    - Is the above
documented somewhere?
> >> > > > > >> > > > > > >
> >> > > > > >> > > > > > > [1]
> >> > > > > >> > > > > > >
> >> > > > > >> > > > > > >
> >> > > > > >> > > > > >
> >> > > > > >> > > > >
> >> > > > > >> > > >
> >> > > > > >> > >
> >> > > > > >> >
> >> > > > > >>
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> https://cwiki.apache.org/confluence/display/KAFKA/Writing+a+Driver+for+Kafka
> >> > > > > >> > > > > > > ,
> >> > > > > >> > > > > > > see "Multi-Fetch".
> >> > > > > >> > > > > > >
> >> > > > > >> > > > > > > Thanks,
> >> > > > > >> > > > > > > Jens
> >> > > > > >> > > > > > >
> >> > > > > >> > > > > > > On Wed, Dec 23, 2015
at 2:44 AM, Ismael Juma <
> >> > > > > >> ismael@juma.me.uk>
> >> > > > > >> > > > > wrote:
> >> > > > > >> > > > > > >
> >> > > > > >> > > > > > > > On Wed, Dec
23, 2015 at 1:24 AM, Gwen Shapira <
> >> > > > > >> > gwen@confluent.io
> >> > > > > >> > > >
> >> > > > > >> > > > > > wrote:
> >> > > > > >> > > > > > > >
> >> > > > > >> > > > > > > > > Given the
background, it sounds like you'll
> >> > > generally
> >> > > > > want
> >> > > > > >> > each
> >> > > > > >> > > > > call
> >> > > > > >> > > > > > to
> >> > > > > >> > > > > > > > > poll()
to return the same number of events
> >> (which
> >> > is
> >> > > > the
> >> > > > > >> > number
> >> > > > > >> > > > you
> >> > > > > >> > > > > > > > planned
> >> > > > > >> > > > > > > > > on having
enough memory / time for). It also
> >> > sounds
> >> > > > like
> >> > > > > >> > tuning
> >> > > > > >> > > > the
> >> > > > > >> > > > > > > > number
> >> > > > > >> > > > > > > > > of events
will be closely tied to tuning the
> >> > session
> >> > > > > >> timeout.
> >> > > > > >> > > > That
> >> > > > > >> > > > > > is -
> >> > > > > >> > > > > > > > if
> >> > > > > >> > > > > > > > > I choose
to lower the session timeout for
> some
> >> > > > reason, I
> >> > > > > >> will
> >> > > > > >> > > > have
> >> > > > > >> > > > > to
> >> > > > > >> > > > > > > > > modify
the number of records returning too.
> >> > > > > >> > > > > > > > >
> >> > > > > >> > > > > > > > > If those
assumptions are correct, I think a
> >> > > > > configuration
> >> > > > > >> > makes
> >> > > > > >> > > > > more
> >> > > > > >> > > > > > > > sense.
> >> > > > > >> > > > > > > > > 1. We are
unlikely to want this parameter to
> be
> >> > > change
> >> > > > > at
> >> > > > > >> the
> >> > > > > >> > > > > > lifetime
> >> > > > > >> > > > > > > of
> >> > > > > >> > > > > > > > > the consumer
> >> > > > > >> > > > > > > > > 2. The
correct value is tied to another
> >> > > configuration
> >> > > > > >> > > parameter,
> >> > > > > >> > > > so
> >> > > > > >> > > > > > > they
> >> > > > > >> > > > > > > > > will be
controlled together.
> >> > > > > >> > > > > > > > >
> >> > > > > >> > > > > > > >
> >> > > > > >> > > > > > > > I was thinking
the same thing.
> >> > > > > >> > > > > > > >
> >> > > > > >> > > > > > > > Ismael
> >> > > > > >> > > > > > > >
> >> > > > > >> > > > > > >
> >> > > > > >> > > > > > >
> >> > > > > >> > > > > > >
> >> > > > > >> > > > > > > --
> >> > > > > >> > > > > > > Jens Rantil
> >> > > > > >> > > > > > > Backend engineer
> >> > > > > >> > > > > > > Tink AB
> >> > > > > >> > > > > > >
> >> > > > > >> > > > > > > Email: jens.rantil@tink.se
> >> > > > > >> > > > > > > Phone: +46 708 84
18 32
> >> > > > > >> > > > > > > Web: www.tink.se
> >> > > > > >> > > > > > >
> >> > > > > >> > > > > > > Facebook <https://www.facebook.com/#!/tink.se>
> >> > Linkedin
> >> > > > > >> > > > > > > <
> >> > > > > >> > > > > > >
> >> > > > > >> > > > > >
> >> > > > > >> > > > >
> >> > > > > >> > > >
> >> > > > > >> > >
> >> > > > > >> >
> >> > > > > >>
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> http://www.linkedin.com/company/2735919?trk=vsrp_companies_res_photo&trkInfo=VSRPsearchId%3A1057023381369207406670%2CVSRPtargetId%3A2735919%2CVSRPcmpt%3Aprimary
> >> > > > > >> > > > > > > >
> >> > > > > >> > > > > > >  Twitter <https://twitter.com/tink>
> >> > > > > >> > > > > > >
> >> > > > > >> > > > > >
> >> > > > > >> > > > >
> >> > > > > >> > > >
> >> > > > > >> > >
> >> > > > > >> >
> >> > > > > >>
> >> > > > > >>
> >> > > > > >>
> >> > > > > >> --
> >> > > > > >> Cliff Rhyne
> >> > > > > >> Software Engineering Lead
> >> > > > > >> e: crhyne@signal.co
> >> > > > > >> signal.co
> >> > > > > >> ________________________
> >> > > > > >>
> >> > > > > >> Cut Through the Noise
> >> > > > > >>
> >> > > > > >> This e-mail and any files transmitted with it are
for the
> sole
> >> use
> >> > > of
> >> > > > > the
> >> > > > > >> intended recipient(s) and may contain confidential
and
> >> privileged
> >> > > > > >> information. Any unauthorized use of this email
is strictly
> >> > > > prohibited.
> >> > > > > >> ©2015 Signal. All rights reserved.
> >> > > > > >>
> >> > > > > >
> >> > > > > >
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> >>
> >>
> >> --
> >> -- Guozhang
> >>
> >
> >
>



-- 
-- Guozhang

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message