kafka-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Gwen Shapira <g...@confluent.io>
Subject Re: KIP-41: KafkaConsumer Max Records
Date Thu, 07 Jan 2016 02:14:15 GMT
I like the fair-consumption approach you chose - "pull as many records as
possible from each partition in a similar round-robin fashion", it is very
intuitive and close enough to fair.

Overall, I'm +1 on the KIP. But you'll need a formal vote :)

On Wed, Jan 6, 2016 at 6:05 PM, Jason Gustafson <jason@confluent.io> wrote:

> Thanks for the suggestion, Ismael. I updated the KIP.
>
> -Jason
>
> On Wed, Jan 6, 2016 at 6:57 AM, Ismael Juma <ismael@juma.me.uk> wrote:
>
> > Thanks Jason. I read the KIP and it makes sense to me. A minor
> suggestion:
> > in the "Ensuring Fair Consumption" section, there are 3 paragraphs with 2
> > examples (2 partitions with 100 max.poll.records and 3 partitions with 30
> > max.poll.records). I think you could simplify this by using one of the
> > examples in the 3 paragraphs.
> >
> > Ismael
> >
> > On Tue, Jan 5, 2016 at 7:32 PM, Jason Gustafson <jason@confluent.io>
> > wrote:
> >
> > > I've updated the KIP with some implementation details. I also added
> more
> > > discussion on the heartbeat() alternative. The short answer for why we
> > > rejected this API is that it doesn't seem to work well with offset
> > commits.
> > > This would tend to make correct usage complicated and difficult to
> > explain.
> > > Additionally, we don't see any clear advantages over having a way to
> set
> > > the max records. For example, using max.records=1 would be equivalent
> to
> > > invoking heartbeat() on each iteration of the message processing loop.
> > >
> > > Going back to the discussion on whether we should use a configuration
> > value
> > > or overload poll(), I'm leaning toward the configuration option mainly
> > for
> > > compatibility and to keep the KafkaConsumer API from getting any more
> > > complex. Also, as others have mentioned, it seems reasonable to want to
> > > tune this setting in the same place that the session timeout and
> > heartbeat
> > > interval are configured. I still feel a little uncomfortable with the
> > need
> > > to do a lot of configuration tuning to get the consumer working for a
> > > particular environment, but hopefully the defaults are conservative
> > enough
> > > that most users won't need to. However, if it remains a problem, then
> we
> > > could still look into better options for managing the size of batches
> > > including overloading poll() with a max records argument or possibly by
> > > implementing a batch scaling algorithm internally.
> > >
> > > -Jason
> > >
> > >
> > > On Mon, Jan 4, 2016 at 12:18 PM, Jason Gustafson <jason@confluent.io>
> > > wrote:
> > >
> > > > Hi Cliff,
> > > >
> > > > I think we're all agreed that the current contract of poll() should
> be
> > > > kept. The consumer wouldn't wait for max messages to become available
> > in
> > > > this proposal; it would only sure that it never returns more than max
> > > > messages.
> > > >
> > > > -Jason
> > > >
> > > > On Mon, Jan 4, 2016 at 11:52 AM, Cliff Rhyne <crhyne@signal.co>
> wrote:
> > > >
> > > >> Instead of a heartbeat, I'd prefer poll() to return whatever
> messages
> > > the
> > > >> client has.  Either a) I don't care if I get less than my max
> message
> > > >> limit
> > > >> or b) I do care and will set a larger timeout.  Case B is less
> common
> > > than
> > > >> A and is fairly easy to handle in the application's code.
> > > >>
> > > >> On Mon, Jan 4, 2016 at 1:47 PM, Gwen Shapira <gwen@confluent.io>
> > wrote:
> > > >>
> > > >> > 1. Agree that TCP window style scaling will be cool. I'll try
to
> > think
> > > >> of a
> > > >> > good excuse to use it ;)
> > > >> >
> > > >> > 2. I'm very concerned about the challenges of getting the
> timeouts,
> > > >> > hearbeats and max messages right.
> > > >> >
> > > >> > Another option could be to expose "heartbeat" API to consumers.
If
> > my
> > > >> app
> > > >> > is still processing data but is still alive, it could initiate
a
> > > >> heartbeat
> > > >> > to signal its alive without having to handle additional messages.
> > > >> >
> > > >> > I don't know if this improves more than it complicates though
:(
> > > >> >
> > > >> > On Mon, Jan 4, 2016 at 11:40 AM, Jason Gustafson <
> > jason@confluent.io>
> > > >> > wrote:
> > > >> >
> > > >> > > Hey Gwen,
> > > >> > >
> > > >> > > I was thinking along the lines of TCP window scaling in
order to
> > > >> > > dynamically find a good consumption rate. Basically you'd
start
> > off
> > > >> > > consuming say 100 records and you'd let it increase until
the
> > > >> consumption
> > > >> > > took longer than half the session timeout (for example).
You
> > /might/
> > > >> be
> > > >> > > able to achieve the same thing using pause/resume, but it
would
> > be a
> > > >> lot
> > > >> > > trickier since you have to do it at the granularity of
> partitions.
> > > But
> > > >> > > yeah, database write performance doesn't always scale in
a
> > > predictable
> > > >> > > enough way to accommodate this, so I'm not sure how useful
it
> > would
> > > >> be in
> > > >> > > practice. It might also be more difficult to implement since
it
> > > >> wouldn't
> > > >> > be
> > > >> > > as clear when to initiate the next fetch. With a static
setting,
> > the
> > > >> > > consumer knows exactly how many records will be returned
on the
> > next
> > > >> call
> > > >> > > to poll() and can send fetches accordingly.
> > > >> > >
> > > >> > > On the other hand, I do feel a little wary of the need to
tune
> the
> > > >> > session
> > > >> > > timeout and max messages though since these settings might
> depend
> > on
> > > >> the
> > > >> > > environment that the consumer is deployed in. It wouldn't
be a
> big
> > > >> deal
> > > >> > if
> > > >> > > the impact was relatively minor, but getting them wrong
can
> cause
> > a
> > > >> lot
> > > >> > of
> > > >> > > rebalance churn which could keep the consumer from making
any
> > > >> progress.
> > > >> > > It's not a particularly graceful failure.
> > > >> > >
> > > >> > > -Jason
> > > >> > >
> > > >> > > On Mon, Jan 4, 2016 at 10:49 AM, Gwen Shapira <
> gwen@confluent.io>
> > > >> wrote:
> > > >> > >
> > > >> > > > I can't speak to all use-cases, but for the database
one, I
> > think
> > > >> > > > pause-resume will be necessary in any case, and therefore
> > dynamic
> > > >> batch
> > > >> > > > sizes are not needed.
> > > >> > > >
> > > >> > > > Databases are really unexpected regarding response
times -
> load
> > > and
> > > >> > > locking
> > > >> > > > can affect this. I'm not sure there's a good way to
know you
> are
> > > >> going
> > > >> > > into
> > > >> > > > rebalance hell before it is too late. So if I were
writing
> code
> > > that
> > > >> > > > updates an RDBMS based on Kafka, I'd pick a reasonable
batch
> > size
> > > >> (say
> > > >> > > 5000
> > > >> > > > records), and basically pause, batch-insert all records,
> commit
> > > and
> > > >> > > resume.
> > > >> > > >
> > > >> > > > Does that make sense?
> > > >> > > >
> > > >> > > > On Mon, Jan 4, 2016 at 10:37 AM, Jason Gustafson <
> > > >> jason@confluent.io>
> > > >> > > > wrote:
> > > >> > > >
> > > >> > > > > Gwen and Ismael,
> > > >> > > > >
> > > >> > > > > I agree the configuration option is probably the
way to go,
> > but
> > > I
> > > >> was
> > > >> > > > > wondering whether there would be cases where it
made sense
> to
> > > let
> > > >> the
> > > >> > > > > consumer dynamically set max messages to adjust
for
> downstream
> > > >> > > slowness.
> > > >> > > > > For example, if the consumer is writing consumed
records to
> > > >> another
> > > >> > > > > database, and that database is experiencing heavier
than
> > > expected
> > > >> > load,
> > > >> > > > > then the consumer could halve its current max
messages in
> > order
> > > to
> > > >> > > adapt
> > > >> > > > > without risking rebalance hell. It could then
increase max
> > > >> messages
> > > >> > as
> > > >> > > > the
> > > >> > > > > load on the database decreases. It's basically
an easier way
> > to
> > > >> > handle
> > > >> > > > flow
> > > >> > > > > control than we provide with pause/resume.
> > > >> > > > >
> > > >> > > > > -Jason
> > > >> > > > >
> > > >> > > > > On Mon, Jan 4, 2016 at 9:46 AM, Gwen Shapira <
> > gwen@confluent.io
> > > >
> > > >> > > wrote:
> > > >> > > > >
> > > >> > > > > > The wiki you pointed to is no longer maintained
and fell
> out
> > > of
> > > >> > sync
> > > >> > > > with
> > > >> > > > > > the code and protocol.
> > > >> > > > > >
> > > >> > > > > > You may want  to refer to:
> > > >> > > > > >
> > > >> > > > > >
> > > >> > > > >
> > > >> > > >
> > > >> > >
> > > >> >
> > > >>
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol
> > > >> > > > > >
> > > >> > > > > > On Mon, Jan 4, 2016 at 4:38 AM, Jens Rantil
<
> > > >> jens.rantil@tink.se>
> > > >> > > > wrote:
> > > >> > > > > >
> > > >> > > > > > > Hi guys,
> > > >> > > > > > >
> > > >> > > > > > > I realized I never thanked yall for
your input - thanks!
> > > >> > > > > > > Jason: I apologize for assuming your
stance on the
> issue!
> > > >> Feels
> > > >> > > like
> > > >> > > > we
> > > >> > > > > > all
> > > >> > > > > > > agreed on the solution. +1
> > > >> > > > > > >
> > > >> > > > > > > Follow-up: Jason made a point about
defining prefetch
> and
> > > >> > fairness
> > > >> > > > > > > behaviour in the KIP. I am now working
on putting that
> > down
> > > in
> > > >> > > > writing.
> > > >> > > > > > To
> > > >> > > > > > > do be able to do this I think I need
to understand the
> > > current
> > > >> > > > prefetch
> > > >> > > > > > > behaviour in the new consumer API (0.9)
a bit better.
> Some
> > > >> > specific
> > > >> > > > > > > questions:
> > > >> > > > > > >
> > > >> > > > > > >    - How does a specific consumer balance
incoming
> > messages
> > > >> from
> > > >> > > > > multiple
> > > >> > > > > > >    partitions? Is the consumer simply
issuing
> Multi-Fetch
> > > >> > > requests[1]
> > > >> > > > > for
> > > >> > > > > > > the
> > > >> > > > > > >    consumed assigned partitions of the
relevant topics?
> Or
> > > is
> > > >> the
> > > >> > > > > > consumer
> > > >> > > > > > >    fetching from one partition at a
time and balancing
> > > between
> > > >> > them
> > > >> > > > > > >    internally? That is, is the responsibility
of
> partition
> > > >> > > balancing
> > > >> > > > > (and
> > > >> > > > > > >    fairness) on the broker side or consumer
side?
> > > >> > > > > > >    - Is the above documented somewhere?
> > > >> > > > > > >
> > > >> > > > > > > [1]
> > > >> > > > > > >
> > > >> > > > > > >
> > > >> > > > > >
> > > >> > > > >
> > > >> > > >
> > > >> > >
> > > >> >
> > > >>
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/Writing+a+Driver+for+Kafka
> > > >> > > > > > > ,
> > > >> > > > > > > see "Multi-Fetch".
> > > >> > > > > > >
> > > >> > > > > > > Thanks,
> > > >> > > > > > > Jens
> > > >> > > > > > >
> > > >> > > > > > > On Wed, Dec 23, 2015 at 2:44 AM, Ismael
Juma <
> > > >> ismael@juma.me.uk>
> > > >> > > > > wrote:
> > > >> > > > > > >
> > > >> > > > > > > > On Wed, Dec 23, 2015 at 1:24 AM,
Gwen Shapira <
> > > >> > gwen@confluent.io
> > > >> > > >
> > > >> > > > > > wrote:
> > > >> > > > > > > >
> > > >> > > > > > > > > Given the background, it sounds
like you'll
> generally
> > > want
> > > >> > each
> > > >> > > > > call
> > > >> > > > > > to
> > > >> > > > > > > > > poll() to return the same
number of events (which is
> > the
> > > >> > number
> > > >> > > > you
> > > >> > > > > > > > planned
> > > >> > > > > > > > > on having enough memory /
time for). It also sounds
> > like
> > > >> > tuning
> > > >> > > > the
> > > >> > > > > > > > number
> > > >> > > > > > > > > of events will be closely
tied to tuning the session
> > > >> timeout.
> > > >> > > > That
> > > >> > > > > > is -
> > > >> > > > > > > > if
> > > >> > > > > > > > > I choose to lower the session
timeout for some
> > reason, I
> > > >> will
> > > >> > > > have
> > > >> > > > > to
> > > >> > > > > > > > > modify the number of records
returning too.
> > > >> > > > > > > > >
> > > >> > > > > > > > > If those assumptions are correct,
I think a
> > > configuration
> > > >> > makes
> > > >> > > > > more
> > > >> > > > > > > > sense.
> > > >> > > > > > > > > 1. We are unlikely to want
this parameter to be
> change
> > > at
> > > >> the
> > > >> > > > > > lifetime
> > > >> > > > > > > of
> > > >> > > > > > > > > the consumer
> > > >> > > > > > > > > 2. The correct value is tied
to another
> configuration
> > > >> > > parameter,
> > > >> > > > so
> > > >> > > > > > > they
> > > >> > > > > > > > > will be controlled together.
> > > >> > > > > > > > >
> > > >> > > > > > > >
> > > >> > > > > > > > I was thinking the same thing.
> > > >> > > > > > > >
> > > >> > > > > > > > Ismael
> > > >> > > > > > > >
> > > >> > > > > > >
> > > >> > > > > > >
> > > >> > > > > > >
> > > >> > > > > > > --
> > > >> > > > > > > Jens Rantil
> > > >> > > > > > > Backend engineer
> > > >> > > > > > > Tink AB
> > > >> > > > > > >
> > > >> > > > > > > Email: jens.rantil@tink.se
> > > >> > > > > > > Phone: +46 708 84 18 32
> > > >> > > > > > > Web: www.tink.se
> > > >> > > > > > >
> > > >> > > > > > > Facebook <https://www.facebook.com/#!/tink.se>
Linkedin
> > > >> > > > > > > <
> > > >> > > > > > >
> > > >> > > > > >
> > > >> > > > >
> > > >> > > >
> > > >> > >
> > > >> >
> > > >>
> > >
> >
> http://www.linkedin.com/company/2735919?trk=vsrp_companies_res_photo&trkInfo=VSRPsearchId%3A1057023381369207406670%2CVSRPtargetId%3A2735919%2CVSRPcmpt%3Aprimary
> > > >> > > > > > > >
> > > >> > > > > > >  Twitter <https://twitter.com/tink>
> > > >> > > > > > >
> > > >> > > > > >
> > > >> > > > >
> > > >> > > >
> > > >> > >
> > > >> >
> > > >>
> > > >>
> > > >>
> > > >> --
> > > >> Cliff Rhyne
> > > >> Software Engineering Lead
> > > >> e: crhyne@signal.co
> > > >> signal.co
> > > >> ________________________
> > > >>
> > > >> Cut Through the Noise
> > > >>
> > > >> This e-mail and any files transmitted with it are for the sole use
> of
> > > the
> > > >> intended recipient(s) and may contain confidential and privileged
> > > >> information. Any unauthorized use of this email is strictly
> > prohibited.
> > > >> ©2015 Signal. All rights reserved.
> > > >>
> > > >
> > > >
> > >
> >
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message