kafka-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Jason Gustafson <ja...@confluent.io>
Subject Re: KIP-41: KafkaConsumer Max Records
Date Fri, 08 Jan 2016 17:42:34 GMT
Hi Aarti,

Thanks for the feedback. I think the concern about memory overhead is
valid. As Guozhang mentioned, the problem already exists in the current
consumer, so this probably deserves consideration outside of this KIP. That
said, it's a good question whether our prefetching strategy makes it more
difficult to control the memory overhead. The approach we've proposed for
prefetching is basically the following: fetch all partitions whenever the
number of retained messages is less than max.poll.records. In the worst
case, this increases the maximum memory used by the consumer by the size of
those retained messages. As you've pointed out, messages could be very
large. We could reduce this requirement with a slight change: instead of
fetching all partitions, we could fetch only those with no retained data.
That would reduce the worst-case overhead to #no partitions *
max.partition.fetch.bytes, which matches the existing memory overhead.
Would that address your concern?

A couple other points worth mentioning is that users have the option not to
use max.poll.records, in which case the behavior will be the same as in the
current consumer. Additionally, the implementation can be changed over time
without affecting users, so we can adjust it in particular when we address
memory concerns in KAFKA-2045.

On a side note, I'm wondering if it would be useful to extend this KIP to
include a max.poll.bytes? For some use cases, it may make more sense to
control the processing time by the size of data instead of the number of
records. Not that I'm in anxious to draw this out, but if we'll need this
setting eventually, we may as well do it now. Thoughts?


-Jason

On Fri, Jan 8, 2016 at 1:03 AM, Jens Rantil <jens.rantil@tink.se> wrote:

> Hi,
>
> I just publicly wanted to thank Jason for the work he's done with the KIP
> and say that I've been in touch with him privately back and forth to work
> out of some of its details. Thanks!
>
> Since it feels like I initiated this KIP a bit I also want to say that I'm
> happy with it and that its proposal solves the initial issue I reported in
> https://issues.apache.org/jira/browse/KAFKA-2986. That said, I open for a
> [VOTE] on my behalf. I propose Jason decides when voting starts.
>
> Cheers and keep up the good work,
> Jens
>
> On Tue, Jan 5, 2016 at 8:32 PM, Jason Gustafson <jason@confluent.io>
> wrote:
>
> > I've updated the KIP with some implementation details. I also added more
> > discussion on the heartbeat() alternative. The short answer for why we
> > rejected this API is that it doesn't seem to work well with offset
> commits.
> > This would tend to make correct usage complicated and difficult to
> explain.
> > Additionally, we don't see any clear advantages over having a way to set
> > the max records. For example, using max.records=1 would be equivalent to
> > invoking heartbeat() on each iteration of the message processing loop.
> >
> > Going back to the discussion on whether we should use a configuration
> value
> > or overload poll(), I'm leaning toward the configuration option mainly
> for
> > compatibility and to keep the KafkaConsumer API from getting any more
> > complex. Also, as others have mentioned, it seems reasonable to want to
> > tune this setting in the same place that the session timeout and
> heartbeat
> > interval are configured. I still feel a little uncomfortable with the
> need
> > to do a lot of configuration tuning to get the consumer working for a
> > particular environment, but hopefully the defaults are conservative
> enough
> > that most users won't need to. However, if it remains a problem, then we
> > could still look into better options for managing the size of batches
> > including overloading poll() with a max records argument or possibly by
> > implementing a batch scaling algorithm internally.
> >
> > -Jason
> >
> >
> > On Mon, Jan 4, 2016 at 12:18 PM, Jason Gustafson <jason@confluent.io>
> > wrote:
> >
> > > Hi Cliff,
> > >
> > > I think we're all agreed that the current contract of poll() should be
> > > kept. The consumer wouldn't wait for max messages to become available
> in
> > > this proposal; it would only sure that it never returns more than max
> > > messages.
> > >
> > > -Jason
> > >
> > > On Mon, Jan 4, 2016 at 11:52 AM, Cliff Rhyne <crhyne@signal.co> wrote:
> > >
> > >> Instead of a heartbeat, I'd prefer poll() to return whatever messages
> > the
> > >> client has.  Either a) I don't care if I get less than my max message
> > >> limit
> > >> or b) I do care and will set a larger timeout.  Case B is less common
> > than
> > >> A and is fairly easy to handle in the application's code.
> > >>
> > >> On Mon, Jan 4, 2016 at 1:47 PM, Gwen Shapira <gwen@confluent.io>
> wrote:
> > >>
> > >> > 1. Agree that TCP window style scaling will be cool. I'll try to
> think
> > >> of a
> > >> > good excuse to use it ;)
> > >> >
> > >> > 2. I'm very concerned about the challenges of getting the timeouts,
> > >> > hearbeats and max messages right.
> > >> >
> > >> > Another option could be to expose "heartbeat" API to consumers. If
> my
> > >> app
> > >> > is still processing data but is still alive, it could initiate a
> > >> heartbeat
> > >> > to signal its alive without having to handle additional messages.
> > >> >
> > >> > I don't know if this improves more than it complicates though :(
> > >> >
> > >> > On Mon, Jan 4, 2016 at 11:40 AM, Jason Gustafson <
> jason@confluent.io>
> > >> > wrote:
> > >> >
> > >> > > Hey Gwen,
> > >> > >
> > >> > > I was thinking along the lines of TCP window scaling in order
to
> > >> > > dynamically find a good consumption rate. Basically you'd start
> off
> > >> > > consuming say 100 records and you'd let it increase until the
> > >> consumption
> > >> > > took longer than half the session timeout (for example). You
> /might/
> > >> be
> > >> > > able to achieve the same thing using pause/resume, but it would
> be a
> > >> lot
> > >> > > trickier since you have to do it at the granularity of partitions.
> > But
> > >> > > yeah, database write performance doesn't always scale in a
> > predictable
> > >> > > enough way to accommodate this, so I'm not sure how useful it
> would
> > >> be in
> > >> > > practice. It might also be more difficult to implement since
it
> > >> wouldn't
> > >> > be
> > >> > > as clear when to initiate the next fetch. With a static setting,
> the
> > >> > > consumer knows exactly how many records will be returned on the
> next
> > >> call
> > >> > > to poll() and can send fetches accordingly.
> > >> > >
> > >> > > On the other hand, I do feel a little wary of the need to tune
the
> > >> > session
> > >> > > timeout and max messages though since these settings might depend
> on
> > >> the
> > >> > > environment that the consumer is deployed in. It wouldn't be
a big
> > >> deal
> > >> > if
> > >> > > the impact was relatively minor, but getting them wrong can cause
> a
> > >> lot
> > >> > of
> > >> > > rebalance churn which could keep the consumer from making any
> > >> progress.
> > >> > > It's not a particularly graceful failure.
> > >> > >
> > >> > > -Jason
> > >> > >
> > >> > > On Mon, Jan 4, 2016 at 10:49 AM, Gwen Shapira <gwen@confluent.io>
> > >> wrote:
> > >> > >
> > >> > > > I can't speak to all use-cases, but for the database one,
I
> think
> > >> > > > pause-resume will be necessary in any case, and therefore
> dynamic
> > >> batch
> > >> > > > sizes are not needed.
> > >> > > >
> > >> > > > Databases are really unexpected regarding response times
- load
> > and
> > >> > > locking
> > >> > > > can affect this. I'm not sure there's a good way to know
you are
> > >> going
> > >> > > into
> > >> > > > rebalance hell before it is too late. So if I were writing
code
> > that
> > >> > > > updates an RDBMS based on Kafka, I'd pick a reasonable batch
> size
> > >> (say
> > >> > > 5000
> > >> > > > records), and basically pause, batch-insert all records,
commit
> > and
> > >> > > resume.
> > >> > > >
> > >> > > > Does that make sense?
> > >> > > >
> > >> > > > On Mon, Jan 4, 2016 at 10:37 AM, Jason Gustafson <
> > >> jason@confluent.io>
> > >> > > > wrote:
> > >> > > >
> > >> > > > > Gwen and Ismael,
> > >> > > > >
> > >> > > > > I agree the configuration option is probably the way
to go,
> but
> > I
> > >> was
> > >> > > > > wondering whether there would be cases where it made
sense to
> > let
> > >> the
> > >> > > > > consumer dynamically set max messages to adjust for
downstream
> > >> > > slowness.
> > >> > > > > For example, if the consumer is writing consumed records
to
> > >> another
> > >> > > > > database, and that database is experiencing heavier
than
> > expected
> > >> > load,
> > >> > > > > then the consumer could halve its current max messages
in
> order
> > to
> > >> > > adapt
> > >> > > > > without risking rebalance hell. It could then increase
max
> > >> messages
> > >> > as
> > >> > > > the
> > >> > > > > load on the database decreases. It's basically an easier
way
> to
> > >> > handle
> > >> > > > flow
> > >> > > > > control than we provide with pause/resume.
> > >> > > > >
> > >> > > > > -Jason
> > >> > > > >
> > >> > > > > On Mon, Jan 4, 2016 at 9:46 AM, Gwen Shapira <
> gwen@confluent.io
> > >
> > >> > > wrote:
> > >> > > > >
> > >> > > > > > The wiki you pointed to is no longer maintained
and fell out
> > of
> > >> > sync
> > >> > > > with
> > >> > > > > > the code and protocol.
> > >> > > > > >
> > >> > > > > > You may want  to refer to:
> > >> > > > > >
> > >> > > > > >
> > >> > > > >
> > >> > > >
> > >> > >
> > >> >
> > >>
> >
> https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol
> > >> > > > > >
> > >> > > > > > On Mon, Jan 4, 2016 at 4:38 AM, Jens Rantil <
> > >> jens.rantil@tink.se>
> > >> > > > wrote:
> > >> > > > > >
> > >> > > > > > > Hi guys,
> > >> > > > > > >
> > >> > > > > > > I realized I never thanked yall for your
input - thanks!
> > >> > > > > > > Jason: I apologize for assuming your stance
on the issue!
> > >> Feels
> > >> > > like
> > >> > > > we
> > >> > > > > > all
> > >> > > > > > > agreed on the solution. +1
> > >> > > > > > >
> > >> > > > > > > Follow-up: Jason made a point about defining
prefetch and
> > >> > fairness
> > >> > > > > > > behaviour in the KIP. I am now working on
putting that
> down
> > in
> > >> > > > writing.
> > >> > > > > > To
> > >> > > > > > > do be able to do this I think I need to understand
the
> > current
> > >> > > > prefetch
> > >> > > > > > > behaviour in the new consumer API (0.9) a
bit better. Some
> > >> > specific
> > >> > > > > > > questions:
> > >> > > > > > >
> > >> > > > > > >    - How does a specific consumer balance
incoming
> messages
> > >> from
> > >> > > > > multiple
> > >> > > > > > >    partitions? Is the consumer simply issuing
Multi-Fetch
> > >> > > requests[1]
> > >> > > > > for
> > >> > > > > > > the
> > >> > > > > > >    consumed assigned partitions of the relevant
topics? Or
> > is
> > >> the
> > >> > > > > > consumer
> > >> > > > > > >    fetching from one partition at a time
and balancing
> > between
> > >> > them
> > >> > > > > > >    internally? That is, is the responsibility
of partition
> > >> > > balancing
> > >> > > > > (and
> > >> > > > > > >    fairness) on the broker side or consumer
side?
> > >> > > > > > >    - Is the above documented somewhere?
> > >> > > > > > >
> > >> > > > > > > [1]
> > >> > > > > > >
> > >> > > > > > >
> > >> > > > > >
> > >> > > > >
> > >> > > >
> > >> > >
> > >> >
> > >>
> >
> https://cwiki.apache.org/confluence/display/KAFKA/Writing+a+Driver+for+Kafka
> > >> > > > > > > ,
> > >> > > > > > > see "Multi-Fetch".
> > >> > > > > > >
> > >> > > > > > > Thanks,
> > >> > > > > > > Jens
> > >> > > > > > >
> > >> > > > > > > On Wed, Dec 23, 2015 at 2:44 AM, Ismael Juma
<
> > >> ismael@juma.me.uk>
> > >> > > > > wrote:
> > >> > > > > > >
> > >> > > > > > > > On Wed, Dec 23, 2015 at 1:24 AM, Gwen
Shapira <
> > >> > gwen@confluent.io
> > >> > > >
> > >> > > > > > wrote:
> > >> > > > > > > >
> > >> > > > > > > > > Given the background, it sounds
like you'll generally
> > want
> > >> > each
> > >> > > > > call
> > >> > > > > > to
> > >> > > > > > > > > poll() to return the same number
of events (which is
> the
> > >> > number
> > >> > > > you
> > >> > > > > > > > planned
> > >> > > > > > > > > on having enough memory / time
for). It also sounds
> like
> > >> > tuning
> > >> > > > the
> > >> > > > > > > > number
> > >> > > > > > > > > of events will be closely tied
to tuning the session
> > >> timeout.
> > >> > > > That
> > >> > > > > > is -
> > >> > > > > > > > if
> > >> > > > > > > > > I choose to lower the session timeout
for some
> reason, I
> > >> will
> > >> > > > have
> > >> > > > > to
> > >> > > > > > > > > modify the number of records returning
too.
> > >> > > > > > > > >
> > >> > > > > > > > > If those assumptions are correct,
I think a
> > configuration
> > >> > makes
> > >> > > > > more
> > >> > > > > > > > sense.
> > >> > > > > > > > > 1. We are unlikely to want this
parameter to be change
> > at
> > >> the
> > >> > > > > > lifetime
> > >> > > > > > > of
> > >> > > > > > > > > the consumer
> > >> > > > > > > > > 2. The correct value is tied to
another configuration
> > >> > > parameter,
> > >> > > > so
> > >> > > > > > > they
> > >> > > > > > > > > will be controlled together.
> > >> > > > > > > > >
> > >> > > > > > > >
> > >> > > > > > > > I was thinking the same thing.
> > >> > > > > > > >
> > >> > > > > > > > Ismael
> > >> > > > > > > >
> > >> > > > > > >
> > >> > > > > > >
> > >> > > > > > >
> > >> > > > > > > --
> > >> > > > > > > Jens Rantil
> > >> > > > > > > Backend engineer
> > >> > > > > > > Tink AB
> > >> > > > > > >
> > >> > > > > > > Email: jens.rantil@tink.se
> > >> > > > > > > Phone: +46 708 84 18 32
> > >> > > > > > > Web: www.tink.se
> > >> > > > > > >
> > >> > > > > > > Facebook <https://www.facebook.com/#!/tink.se>
Linkedin
> > >> > > > > > > <
> > >> > > > > > >
> > >> > > > > >
> > >> > > > >
> > >> > > >
> > >> > >
> > >> >
> > >>
> >
> http://www.linkedin.com/company/2735919?trk=vsrp_companies_res_photo&trkInfo=VSRPsearchId%3A1057023381369207406670%2CVSRPtargetId%3A2735919%2CVSRPcmpt%3Aprimary
> > >> > > > > > > >
> > >> > > > > > >  Twitter <https://twitter.com/tink>
> > >> > > > > > >
> > >> > > > > >
> > >> > > > >
> > >> > > >
> > >> > >
> > >> >
> > >>
> > >>
> > >>
> > >> --
> > >> Cliff Rhyne
> > >> Software Engineering Lead
> > >> e: crhyne@signal.co
> > >> signal.co
> > >> ________________________
> > >>
> > >> Cut Through the Noise
> > >>
> > >> This e-mail and any files transmitted with it are for the sole use of
> > the
> > >> intended recipient(s) and may contain confidential and privileged
> > >> information. Any unauthorized use of this email is strictly
> prohibited.
> > >> ©2015 Signal. All rights reserved.
> > >>
> > >
> > >
> >
>
>
>
> --
> Jens Rantil
> Backend engineer
> Tink AB
>
> Email: jens.rantil@tink.se
> Phone: +46 708 84 18 32
> Web: www.tink.se
>
> Facebook <https://www.facebook.com/#!/tink.se> Linkedin
> <
> http://www.linkedin.com/company/2735919?trk=vsrp_companies_res_photo&trkInfo=VSRPsearchId%3A1057023381369207406670%2CVSRPtargetId%3A2735919%2CVSRPcmpt%3Aprimary
> >
>  Twitter <https://twitter.com/tink>
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message