kafka-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Ismael Juma <ism...@juma.me.uk>
Subject Re: KIP-41: KafkaConsumer Max Records
Date Wed, 06 Jan 2016 14:57:47 GMT
Thanks Jason. I read the KIP and it makes sense to me. A minor suggestion:
in the "Ensuring Fair Consumption" section, there are 3 paragraphs with 2
examples (2 partitions with 100 max.poll.records and 3 partitions with 30
max.poll.records). I think you could simplify this by using one of the
examples in the 3 paragraphs.

Ismael

On Tue, Jan 5, 2016 at 7:32 PM, Jason Gustafson <jason@confluent.io> wrote:

> I've updated the KIP with some implementation details. I also added more
> discussion on the heartbeat() alternative. The short answer for why we
> rejected this API is that it doesn't seem to work well with offset commits.
> This would tend to make correct usage complicated and difficult to explain.
> Additionally, we don't see any clear advantages over having a way to set
> the max records. For example, using max.records=1 would be equivalent to
> invoking heartbeat() on each iteration of the message processing loop.
>
> Going back to the discussion on whether we should use a configuration value
> or overload poll(), I'm leaning toward the configuration option mainly for
> compatibility and to keep the KafkaConsumer API from getting any more
> complex. Also, as others have mentioned, it seems reasonable to want to
> tune this setting in the same place that the session timeout and heartbeat
> interval are configured. I still feel a little uncomfortable with the need
> to do a lot of configuration tuning to get the consumer working for a
> particular environment, but hopefully the defaults are conservative enough
> that most users won't need to. However, if it remains a problem, then we
> could still look into better options for managing the size of batches
> including overloading poll() with a max records argument or possibly by
> implementing a batch scaling algorithm internally.
>
> -Jason
>
>
> On Mon, Jan 4, 2016 at 12:18 PM, Jason Gustafson <jason@confluent.io>
> wrote:
>
> > Hi Cliff,
> >
> > I think we're all agreed that the current contract of poll() should be
> > kept. The consumer wouldn't wait for max messages to become available in
> > this proposal; it would only sure that it never returns more than max
> > messages.
> >
> > -Jason
> >
> > On Mon, Jan 4, 2016 at 11:52 AM, Cliff Rhyne <crhyne@signal.co> wrote:
> >
> >> Instead of a heartbeat, I'd prefer poll() to return whatever messages
> the
> >> client has.  Either a) I don't care if I get less than my max message
> >> limit
> >> or b) I do care and will set a larger timeout.  Case B is less common
> than
> >> A and is fairly easy to handle in the application's code.
> >>
> >> On Mon, Jan 4, 2016 at 1:47 PM, Gwen Shapira <gwen@confluent.io> wrote:
> >>
> >> > 1. Agree that TCP window style scaling will be cool. I'll try to think
> >> of a
> >> > good excuse to use it ;)
> >> >
> >> > 2. I'm very concerned about the challenges of getting the timeouts,
> >> > hearbeats and max messages right.
> >> >
> >> > Another option could be to expose "heartbeat" API to consumers. If my
> >> app
> >> > is still processing data but is still alive, it could initiate a
> >> heartbeat
> >> > to signal its alive without having to handle additional messages.
> >> >
> >> > I don't know if this improves more than it complicates though :(
> >> >
> >> > On Mon, Jan 4, 2016 at 11:40 AM, Jason Gustafson <jason@confluent.io>
> >> > wrote:
> >> >
> >> > > Hey Gwen,
> >> > >
> >> > > I was thinking along the lines of TCP window scaling in order to
> >> > > dynamically find a good consumption rate. Basically you'd start off
> >> > > consuming say 100 records and you'd let it increase until the
> >> consumption
> >> > > took longer than half the session timeout (for example). You /might/
> >> be
> >> > > able to achieve the same thing using pause/resume, but it would be
a
> >> lot
> >> > > trickier since you have to do it at the granularity of partitions.
> But
> >> > > yeah, database write performance doesn't always scale in a
> predictable
> >> > > enough way to accommodate this, so I'm not sure how useful it would
> >> be in
> >> > > practice. It might also be more difficult to implement since it
> >> wouldn't
> >> > be
> >> > > as clear when to initiate the next fetch. With a static setting, the
> >> > > consumer knows exactly how many records will be returned on the next
> >> call
> >> > > to poll() and can send fetches accordingly.
> >> > >
> >> > > On the other hand, I do feel a little wary of the need to tune the
> >> > session
> >> > > timeout and max messages though since these settings might depend
on
> >> the
> >> > > environment that the consumer is deployed in. It wouldn't be a big
> >> deal
> >> > if
> >> > > the impact was relatively minor, but getting them wrong can cause
a
> >> lot
> >> > of
> >> > > rebalance churn which could keep the consumer from making any
> >> progress.
> >> > > It's not a particularly graceful failure.
> >> > >
> >> > > -Jason
> >> > >
> >> > > On Mon, Jan 4, 2016 at 10:49 AM, Gwen Shapira <gwen@confluent.io>
> >> wrote:
> >> > >
> >> > > > I can't speak to all use-cases, but for the database one, I think
> >> > > > pause-resume will be necessary in any case, and therefore dynamic
> >> batch
> >> > > > sizes are not needed.
> >> > > >
> >> > > > Databases are really unexpected regarding response times - load
> and
> >> > > locking
> >> > > > can affect this. I'm not sure there's a good way to know you
are
> >> going
> >> > > into
> >> > > > rebalance hell before it is too late. So if I were writing code
> that
> >> > > > updates an RDBMS based on Kafka, I'd pick a reasonable batch
size
> >> (say
> >> > > 5000
> >> > > > records), and basically pause, batch-insert all records, commit
> and
> >> > > resume.
> >> > > >
> >> > > > Does that make sense?
> >> > > >
> >> > > > On Mon, Jan 4, 2016 at 10:37 AM, Jason Gustafson <
> >> jason@confluent.io>
> >> > > > wrote:
> >> > > >
> >> > > > > Gwen and Ismael,
> >> > > > >
> >> > > > > I agree the configuration option is probably the way to
go, but
> I
> >> was
> >> > > > > wondering whether there would be cases where it made sense
to
> let
> >> the
> >> > > > > consumer dynamically set max messages to adjust for downstream
> >> > > slowness.
> >> > > > > For example, if the consumer is writing consumed records
to
> >> another
> >> > > > > database, and that database is experiencing heavier than
> expected
> >> > load,
> >> > > > > then the consumer could halve its current max messages in
order
> to
> >> > > adapt
> >> > > > > without risking rebalance hell. It could then increase max
> >> messages
> >> > as
> >> > > > the
> >> > > > > load on the database decreases. It's basically an easier
way to
> >> > handle
> >> > > > flow
> >> > > > > control than we provide with pause/resume.
> >> > > > >
> >> > > > > -Jason
> >> > > > >
> >> > > > > On Mon, Jan 4, 2016 at 9:46 AM, Gwen Shapira <gwen@confluent.io
> >
> >> > > wrote:
> >> > > > >
> >> > > > > > The wiki you pointed to is no longer maintained and
fell out
> of
> >> > sync
> >> > > > with
> >> > > > > > the code and protocol.
> >> > > > > >
> >> > > > > > You may want  to refer to:
> >> > > > > >
> >> > > > > >
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol
> >> > > > > >
> >> > > > > > On Mon, Jan 4, 2016 at 4:38 AM, Jens Rantil <
> >> jens.rantil@tink.se>
> >> > > > wrote:
> >> > > > > >
> >> > > > > > > Hi guys,
> >> > > > > > >
> >> > > > > > > I realized I never thanked yall for your input
- thanks!
> >> > > > > > > Jason: I apologize for assuming your stance on
the issue!
> >> Feels
> >> > > like
> >> > > > we
> >> > > > > > all
> >> > > > > > > agreed on the solution. +1
> >> > > > > > >
> >> > > > > > > Follow-up: Jason made a point about defining prefetch
and
> >> > fairness
> >> > > > > > > behaviour in the KIP. I am now working on putting
that down
> in
> >> > > > writing.
> >> > > > > > To
> >> > > > > > > do be able to do this I think I need to understand
the
> current
> >> > > > prefetch
> >> > > > > > > behaviour in the new consumer API (0.9) a bit
better. Some
> >> > specific
> >> > > > > > > questions:
> >> > > > > > >
> >> > > > > > >    - How does a specific consumer balance incoming
messages
> >> from
> >> > > > > multiple
> >> > > > > > >    partitions? Is the consumer simply issuing
Multi-Fetch
> >> > > requests[1]
> >> > > > > for
> >> > > > > > > the
> >> > > > > > >    consumed assigned partitions of the relevant
topics? Or
> is
> >> the
> >> > > > > > consumer
> >> > > > > > >    fetching from one partition at a time and balancing
> between
> >> > them
> >> > > > > > >    internally? That is, is the responsibility
of partition
> >> > > balancing
> >> > > > > (and
> >> > > > > > >    fairness) on the broker side or consumer side?
> >> > > > > > >    - Is the above documented somewhere?
> >> > > > > > >
> >> > > > > > > [1]
> >> > > > > > >
> >> > > > > > >
> >> > > > > >
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> https://cwiki.apache.org/confluence/display/KAFKA/Writing+a+Driver+for+Kafka
> >> > > > > > > ,
> >> > > > > > > see "Multi-Fetch".
> >> > > > > > >
> >> > > > > > > Thanks,
> >> > > > > > > Jens
> >> > > > > > >
> >> > > > > > > On Wed, Dec 23, 2015 at 2:44 AM, Ismael Juma <
> >> ismael@juma.me.uk>
> >> > > > > wrote:
> >> > > > > > >
> >> > > > > > > > On Wed, Dec 23, 2015 at 1:24 AM, Gwen Shapira
<
> >> > gwen@confluent.io
> >> > > >
> >> > > > > > wrote:
> >> > > > > > > >
> >> > > > > > > > > Given the background, it sounds like
you'll generally
> want
> >> > each
> >> > > > > call
> >> > > > > > to
> >> > > > > > > > > poll() to return the same number of
events (which is the
> >> > number
> >> > > > you
> >> > > > > > > > planned
> >> > > > > > > > > on having enough memory / time for).
It also sounds like
> >> > tuning
> >> > > > the
> >> > > > > > > > number
> >> > > > > > > > > of events will be closely tied to tuning
the session
> >> timeout.
> >> > > > That
> >> > > > > > is -
> >> > > > > > > > if
> >> > > > > > > > > I choose to lower the session timeout
for some reason, I
> >> will
> >> > > > have
> >> > > > > to
> >> > > > > > > > > modify the number of records returning
too.
> >> > > > > > > > >
> >> > > > > > > > > If those assumptions are correct, I
think a
> configuration
> >> > makes
> >> > > > > more
> >> > > > > > > > sense.
> >> > > > > > > > > 1. We are unlikely to want this parameter
to be change
> at
> >> the
> >> > > > > > lifetime
> >> > > > > > > of
> >> > > > > > > > > the consumer
> >> > > > > > > > > 2. The correct value is tied to another
configuration
> >> > > parameter,
> >> > > > so
> >> > > > > > > they
> >> > > > > > > > > will be controlled together.
> >> > > > > > > > >
> >> > > > > > > >
> >> > > > > > > > I was thinking the same thing.
> >> > > > > > > >
> >> > > > > > > > Ismael
> >> > > > > > > >
> >> > > > > > >
> >> > > > > > >
> >> > > > > > >
> >> > > > > > > --
> >> > > > > > > Jens Rantil
> >> > > > > > > Backend engineer
> >> > > > > > > Tink AB
> >> > > > > > >
> >> > > > > > > Email: jens.rantil@tink.se
> >> > > > > > > Phone: +46 708 84 18 32
> >> > > > > > > Web: www.tink.se
> >> > > > > > >
> >> > > > > > > Facebook <https://www.facebook.com/#!/tink.se>
Linkedin
> >> > > > > > > <
> >> > > > > > >
> >> > > > > >
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> http://www.linkedin.com/company/2735919?trk=vsrp_companies_res_photo&trkInfo=VSRPsearchId%3A1057023381369207406670%2CVSRPtargetId%3A2735919%2CVSRPcmpt%3Aprimary
> >> > > > > > > >
> >> > > > > > >  Twitter <https://twitter.com/tink>
> >> > > > > > >
> >> > > > > >
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> >>
> >>
> >> --
> >> Cliff Rhyne
> >> Software Engineering Lead
> >> e: crhyne@signal.co
> >> signal.co
> >> ________________________
> >>
> >> Cut Through the Noise
> >>
> >> This e-mail and any files transmitted with it are for the sole use of
> the
> >> intended recipient(s) and may contain confidential and privileged
> >> information. Any unauthorized use of this email is strictly prohibited.
> >> ©2015 Signal. All rights reserved.
> >>
> >
> >
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message