kafka-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Jason Gustafson <ja...@confluent.io>
Subject Re: KIP-41: KafkaConsumer Max Records
Date Thu, 07 Jan 2016 02:05:49 GMT
Thanks for the suggestion, Ismael. I updated the KIP.

-Jason

On Wed, Jan 6, 2016 at 6:57 AM, Ismael Juma <ismael@juma.me.uk> wrote:

> Thanks Jason. I read the KIP and it makes sense to me. A minor suggestion:
> in the "Ensuring Fair Consumption" section, there are 3 paragraphs with 2
> examples (2 partitions with 100 max.poll.records and 3 partitions with 30
> max.poll.records). I think you could simplify this by using one of the
> examples in the 3 paragraphs.
>
> Ismael
>
> On Tue, Jan 5, 2016 at 7:32 PM, Jason Gustafson <jason@confluent.io>
> wrote:
>
> > I've updated the KIP with some implementation details. I also added more
> > discussion on the heartbeat() alternative. The short answer for why we
> > rejected this API is that it doesn't seem to work well with offset
> commits.
> > This would tend to make correct usage complicated and difficult to
> explain.
> > Additionally, we don't see any clear advantages over having a way to set
> > the max records. For example, using max.records=1 would be equivalent to
> > invoking heartbeat() on each iteration of the message processing loop.
> >
> > Going back to the discussion on whether we should use a configuration
> value
> > or overload poll(), I'm leaning toward the configuration option mainly
> for
> > compatibility and to keep the KafkaConsumer API from getting any more
> > complex. Also, as others have mentioned, it seems reasonable to want to
> > tune this setting in the same place that the session timeout and
> heartbeat
> > interval are configured. I still feel a little uncomfortable with the
> need
> > to do a lot of configuration tuning to get the consumer working for a
> > particular environment, but hopefully the defaults are conservative
> enough
> > that most users won't need to. However, if it remains a problem, then we
> > could still look into better options for managing the size of batches
> > including overloading poll() with a max records argument or possibly by
> > implementing a batch scaling algorithm internally.
> >
> > -Jason
> >
> >
> > On Mon, Jan 4, 2016 at 12:18 PM, Jason Gustafson <jason@confluent.io>
> > wrote:
> >
> > > Hi Cliff,
> > >
> > > I think we're all agreed that the current contract of poll() should be
> > > kept. The consumer wouldn't wait for max messages to become available
> in
> > > this proposal; it would only sure that it never returns more than max
> > > messages.
> > >
> > > -Jason
> > >
> > > On Mon, Jan 4, 2016 at 11:52 AM, Cliff Rhyne <crhyne@signal.co> wrote:
> > >
> > >> Instead of a heartbeat, I'd prefer poll() to return whatever messages
> > the
> > >> client has.  Either a) I don't care if I get less than my max message
> > >> limit
> > >> or b) I do care and will set a larger timeout.  Case B is less common
> > than
> > >> A and is fairly easy to handle in the application's code.
> > >>
> > >> On Mon, Jan 4, 2016 at 1:47 PM, Gwen Shapira <gwen@confluent.io>
> wrote:
> > >>
> > >> > 1. Agree that TCP window style scaling will be cool. I'll try to
> think
> > >> of a
> > >> > good excuse to use it ;)
> > >> >
> > >> > 2. I'm very concerned about the challenges of getting the timeouts,
> > >> > hearbeats and max messages right.
> > >> >
> > >> > Another option could be to expose "heartbeat" API to consumers. If
> my
> > >> app
> > >> > is still processing data but is still alive, it could initiate a
> > >> heartbeat
> > >> > to signal its alive without having to handle additional messages.
> > >> >
> > >> > I don't know if this improves more than it complicates though :(
> > >> >
> > >> > On Mon, Jan 4, 2016 at 11:40 AM, Jason Gustafson <
> jason@confluent.io>
> > >> > wrote:
> > >> >
> > >> > > Hey Gwen,
> > >> > >
> > >> > > I was thinking along the lines of TCP window scaling in order
to
> > >> > > dynamically find a good consumption rate. Basically you'd start
> off
> > >> > > consuming say 100 records and you'd let it increase until the
> > >> consumption
> > >> > > took longer than half the session timeout (for example). You
> /might/
> > >> be
> > >> > > able to achieve the same thing using pause/resume, but it would
> be a
> > >> lot
> > >> > > trickier since you have to do it at the granularity of partitions.
> > But
> > >> > > yeah, database write performance doesn't always scale in a
> > predictable
> > >> > > enough way to accommodate this, so I'm not sure how useful it
> would
> > >> be in
> > >> > > practice. It might also be more difficult to implement since
it
> > >> wouldn't
> > >> > be
> > >> > > as clear when to initiate the next fetch. With a static setting,
> the
> > >> > > consumer knows exactly how many records will be returned on the
> next
> > >> call
> > >> > > to poll() and can send fetches accordingly.
> > >> > >
> > >> > > On the other hand, I do feel a little wary of the need to tune
the
> > >> > session
> > >> > > timeout and max messages though since these settings might depend
> on
> > >> the
> > >> > > environment that the consumer is deployed in. It wouldn't be
a big
> > >> deal
> > >> > if
> > >> > > the impact was relatively minor, but getting them wrong can cause
> a
> > >> lot
> > >> > of
> > >> > > rebalance churn which could keep the consumer from making any
> > >> progress.
> > >> > > It's not a particularly graceful failure.
> > >> > >
> > >> > > -Jason
> > >> > >
> > >> > > On Mon, Jan 4, 2016 at 10:49 AM, Gwen Shapira <gwen@confluent.io>
> > >> wrote:
> > >> > >
> > >> > > > I can't speak to all use-cases, but for the database one,
I
> think
> > >> > > > pause-resume will be necessary in any case, and therefore
> dynamic
> > >> batch
> > >> > > > sizes are not needed.
> > >> > > >
> > >> > > > Databases are really unexpected regarding response times
- load
> > and
> > >> > > locking
> > >> > > > can affect this. I'm not sure there's a good way to know
you are
> > >> going
> > >> > > into
> > >> > > > rebalance hell before it is too late. So if I were writing
code
> > that
> > >> > > > updates an RDBMS based on Kafka, I'd pick a reasonable batch
> size
> > >> (say
> > >> > > 5000
> > >> > > > records), and basically pause, batch-insert all records,
commit
> > and
> > >> > > resume.
> > >> > > >
> > >> > > > Does that make sense?
> > >> > > >
> > >> > > > On Mon, Jan 4, 2016 at 10:37 AM, Jason Gustafson <
> > >> jason@confluent.io>
> > >> > > > wrote:
> > >> > > >
> > >> > > > > Gwen and Ismael,
> > >> > > > >
> > >> > > > > I agree the configuration option is probably the way
to go,
> but
> > I
> > >> was
> > >> > > > > wondering whether there would be cases where it made
sense to
> > let
> > >> the
> > >> > > > > consumer dynamically set max messages to adjust for
downstream
> > >> > > slowness.
> > >> > > > > For example, if the consumer is writing consumed records
to
> > >> another
> > >> > > > > database, and that database is experiencing heavier
than
> > expected
> > >> > load,
> > >> > > > > then the consumer could halve its current max messages
in
> order
> > to
> > >> > > adapt
> > >> > > > > without risking rebalance hell. It could then increase
max
> > >> messages
> > >> > as
> > >> > > > the
> > >> > > > > load on the database decreases. It's basically an easier
way
> to
> > >> > handle
> > >> > > > flow
> > >> > > > > control than we provide with pause/resume.
> > >> > > > >
> > >> > > > > -Jason
> > >> > > > >
> > >> > > > > On Mon, Jan 4, 2016 at 9:46 AM, Gwen Shapira <
> gwen@confluent.io
> > >
> > >> > > wrote:
> > >> > > > >
> > >> > > > > > The wiki you pointed to is no longer maintained
and fell out
> > of
> > >> > sync
> > >> > > > with
> > >> > > > > > the code and protocol.
> > >> > > > > >
> > >> > > > > > You may want  to refer to:
> > >> > > > > >
> > >> > > > > >
> > >> > > > >
> > >> > > >
> > >> > >
> > >> >
> > >>
> >
> https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol
> > >> > > > > >
> > >> > > > > > On Mon, Jan 4, 2016 at 4:38 AM, Jens Rantil <
> > >> jens.rantil@tink.se>
> > >> > > > wrote:
> > >> > > > > >
> > >> > > > > > > Hi guys,
> > >> > > > > > >
> > >> > > > > > > I realized I never thanked yall for your
input - thanks!
> > >> > > > > > > Jason: I apologize for assuming your stance
on the issue!
> > >> Feels
> > >> > > like
> > >> > > > we
> > >> > > > > > all
> > >> > > > > > > agreed on the solution. +1
> > >> > > > > > >
> > >> > > > > > > Follow-up: Jason made a point about defining
prefetch and
> > >> > fairness
> > >> > > > > > > behaviour in the KIP. I am now working on
putting that
> down
> > in
> > >> > > > writing.
> > >> > > > > > To
> > >> > > > > > > do be able to do this I think I need to understand
the
> > current
> > >> > > > prefetch
> > >> > > > > > > behaviour in the new consumer API (0.9) a
bit better. Some
> > >> > specific
> > >> > > > > > > questions:
> > >> > > > > > >
> > >> > > > > > >    - How does a specific consumer balance
incoming
> messages
> > >> from
> > >> > > > > multiple
> > >> > > > > > >    partitions? Is the consumer simply issuing
Multi-Fetch
> > >> > > requests[1]
> > >> > > > > for
> > >> > > > > > > the
> > >> > > > > > >    consumed assigned partitions of the relevant
topics? Or
> > is
> > >> the
> > >> > > > > > consumer
> > >> > > > > > >    fetching from one partition at a time
and balancing
> > between
> > >> > them
> > >> > > > > > >    internally? That is, is the responsibility
of partition
> > >> > > balancing
> > >> > > > > (and
> > >> > > > > > >    fairness) on the broker side or consumer
side?
> > >> > > > > > >    - Is the above documented somewhere?
> > >> > > > > > >
> > >> > > > > > > [1]
> > >> > > > > > >
> > >> > > > > > >
> > >> > > > > >
> > >> > > > >
> > >> > > >
> > >> > >
> > >> >
> > >>
> >
> https://cwiki.apache.org/confluence/display/KAFKA/Writing+a+Driver+for+Kafka
> > >> > > > > > > ,
> > >> > > > > > > see "Multi-Fetch".
> > >> > > > > > >
> > >> > > > > > > Thanks,
> > >> > > > > > > Jens
> > >> > > > > > >
> > >> > > > > > > On Wed, Dec 23, 2015 at 2:44 AM, Ismael Juma
<
> > >> ismael@juma.me.uk>
> > >> > > > > wrote:
> > >> > > > > > >
> > >> > > > > > > > On Wed, Dec 23, 2015 at 1:24 AM, Gwen
Shapira <
> > >> > gwen@confluent.io
> > >> > > >
> > >> > > > > > wrote:
> > >> > > > > > > >
> > >> > > > > > > > > Given the background, it sounds
like you'll generally
> > want
> > >> > each
> > >> > > > > call
> > >> > > > > > to
> > >> > > > > > > > > poll() to return the same number
of events (which is
> the
> > >> > number
> > >> > > > you
> > >> > > > > > > > planned
> > >> > > > > > > > > on having enough memory / time
for). It also sounds
> like
> > >> > tuning
> > >> > > > the
> > >> > > > > > > > number
> > >> > > > > > > > > of events will be closely tied
to tuning the session
> > >> timeout.
> > >> > > > That
> > >> > > > > > is -
> > >> > > > > > > > if
> > >> > > > > > > > > I choose to lower the session timeout
for some
> reason, I
> > >> will
> > >> > > > have
> > >> > > > > to
> > >> > > > > > > > > modify the number of records returning
too.
> > >> > > > > > > > >
> > >> > > > > > > > > If those assumptions are correct,
I think a
> > configuration
> > >> > makes
> > >> > > > > more
> > >> > > > > > > > sense.
> > >> > > > > > > > > 1. We are unlikely to want this
parameter to be change
> > at
> > >> the
> > >> > > > > > lifetime
> > >> > > > > > > of
> > >> > > > > > > > > the consumer
> > >> > > > > > > > > 2. The correct value is tied to
another configuration
> > >> > > parameter,
> > >> > > > so
> > >> > > > > > > they
> > >> > > > > > > > > will be controlled together.
> > >> > > > > > > > >
> > >> > > > > > > >
> > >> > > > > > > > I was thinking the same thing.
> > >> > > > > > > >
> > >> > > > > > > > Ismael
> > >> > > > > > > >
> > >> > > > > > >
> > >> > > > > > >
> > >> > > > > > >
> > >> > > > > > > --
> > >> > > > > > > Jens Rantil
> > >> > > > > > > Backend engineer
> > >> > > > > > > Tink AB
> > >> > > > > > >
> > >> > > > > > > Email: jens.rantil@tink.se
> > >> > > > > > > Phone: +46 708 84 18 32
> > >> > > > > > > Web: www.tink.se
> > >> > > > > > >
> > >> > > > > > > Facebook <https://www.facebook.com/#!/tink.se>
Linkedin
> > >> > > > > > > <
> > >> > > > > > >
> > >> > > > > >
> > >> > > > >
> > >> > > >
> > >> > >
> > >> >
> > >>
> >
> http://www.linkedin.com/company/2735919?trk=vsrp_companies_res_photo&trkInfo=VSRPsearchId%3A1057023381369207406670%2CVSRPtargetId%3A2735919%2CVSRPcmpt%3Aprimary
> > >> > > > > > > >
> > >> > > > > > >  Twitter <https://twitter.com/tink>
> > >> > > > > > >
> > >> > > > > >
> > >> > > > >
> > >> > > >
> > >> > >
> > >> >
> > >>
> > >>
> > >>
> > >> --
> > >> Cliff Rhyne
> > >> Software Engineering Lead
> > >> e: crhyne@signal.co
> > >> signal.co
> > >> ________________________
> > >>
> > >> Cut Through the Noise
> > >>
> > >> This e-mail and any files transmitted with it are for the sole use of
> > the
> > >> intended recipient(s) and may contain confidential and privileged
> > >> information. Any unauthorized use of this email is strictly
> prohibited.
> > >> ©2015 Signal. All rights reserved.
> > >>
> > >
> > >
> >
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message