kafka-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Aarti Gupta <aartigup...@gmail.com>
Subject Re: KIP-41: KafkaConsumer Max Records
Date Fri, 08 Jan 2016 01:21:46 GMT
@Jason,
(apologies, got your name wrong the first time round)

On Thu, Jan 7, 2016 at 5:15 PM, Aarti Gupta <aartiguptaa@gmail.com> wrote:

> Hi Json,
>
> I am concerned about how many records can be prefetched into consumer
> memory.
> Currently we control the maximum number of bytes per topic and partition
> by setting fetch.message.max.bytes
>
> The max.partition.fetch.bytes = #no of partitions *
> fetch.message.max.bytes
> However, partitions can be added dynamically, which would mean that a
> single process (for example a single JVM with multiple consumers), that
> consumes messages from large number of partitions may not able to keep all
> the pre fetched messages in memory.
>
> Additionally, if the relative size of messages is highly variable, it
> would be hard to correlate the max size in bytes for message fetch with the
> number of records returned on a poll.
> We previously observed (in a production setup), that, if the size of the
> message is greater than fetch.message.max.bytes, the consumer gets stuck.
> This encouraged us to increase the fetch.message.max.bytes to a
> significantly large value. This would worsen the memory consumption fear
> described above,( when the number of partitions is also large.)
>
> While there may not be a single magic formula to predict the correct
> combination of fetch.message.max.bytes and #*max.poll.records, **maybe we
> can make the prefetch algorithm a mathematical function of the f*etch.message.max.bytes
> and #noofpartitions?
>
> thoughts?
> Thanks
> aarti
>
> additional unimportant note: the link to the JIRA in the KIP is broken
>
> On Thu, Jan 7, 2016 at 2:37 PM, Guozhang Wang <wangguoz@gmail.com> wrote:
>
>> Thanks Jason. I think it is a good feature to add, +1.
>>
>> As suggested in KIP-32, we'd better to keep end state of the KIP wiki with
>> finalized implementation details rather than leaving a list of options. I
>> agree that for both fairness and pre-fetching the simpler approach would
>> be
>> sufficient for most of the time. So could we move the other approach to
>> "rejected"?
>>
>> Guozhang
>>
>> On Wed, Jan 6, 2016 at 6:14 PM, Gwen Shapira <gwen@confluent.io> wrote:
>>
>> > I like the fair-consumption approach you chose - "pull as many records
>> as
>> > possible from each partition in a similar round-robin fashion", it is
>> very
>> > intuitive and close enough to fair.
>> >
>> > Overall, I'm +1 on the KIP. But you'll need a formal vote :)
>> >
>> > On Wed, Jan 6, 2016 at 6:05 PM, Jason Gustafson <jason@confluent.io>
>> > wrote:
>> >
>> > > Thanks for the suggestion, Ismael. I updated the KIP.
>> > >
>> > > -Jason
>> > >
>> > > On Wed, Jan 6, 2016 at 6:57 AM, Ismael Juma <ismael@juma.me.uk>
>> wrote:
>> > >
>> > > > Thanks Jason. I read the KIP and it makes sense to me. A minor
>> > > suggestion:
>> > > > in the "Ensuring Fair Consumption" section, there are 3 paragraphs
>> > with 2
>> > > > examples (2 partitions with 100 max.poll.records and 3 partitions
>> with
>> > 30
>> > > > max.poll.records). I think you could simplify this by using one of
>> the
>> > > > examples in the 3 paragraphs.
>> > > >
>> > > > Ismael
>> > > >
>> > > > On Tue, Jan 5, 2016 at 7:32 PM, Jason Gustafson <jason@confluent.io
>> >
>> > > > wrote:
>> > > >
>> > > > > I've updated the KIP with some implementation details. I also
>> added
>> > > more
>> > > > > discussion on the heartbeat() alternative. The short answer for
>> why
>> > we
>> > > > > rejected this API is that it doesn't seem to work well with offset
>> > > > commits.
>> > > > > This would tend to make correct usage complicated and difficult
to
>> > > > explain.
>> > > > > Additionally, we don't see any clear advantages over having a
way
>> to
>> > > set
>> > > > > the max records. For example, using max.records=1 would be
>> equivalent
>> > > to
>> > > > > invoking heartbeat() on each iteration of the message processing
>> > loop.
>> > > > >
>> > > > > Going back to the discussion on whether we should use a
>> configuration
>> > > > value
>> > > > > or overload poll(), I'm leaning toward the configuration option
>> > mainly
>> > > > for
>> > > > > compatibility and to keep the KafkaConsumer API from getting
any
>> more
>> > > > > complex. Also, as others have mentioned, it seems reasonable
to
>> want
>> > to
>> > > > > tune this setting in the same place that the session timeout
and
>> > > > heartbeat
>> > > > > interval are configured. I still feel a little uncomfortable
with
>> the
>> > > > need
>> > > > > to do a lot of configuration tuning to get the consumer working
>> for a
>> > > > > particular environment, but hopefully the defaults are
>> conservative
>> > > > enough
>> > > > > that most users won't need to. However, if it remains a problem,
>> then
>> > > we
>> > > > > could still look into better options for managing the size of
>> batches
>> > > > > including overloading poll() with a max records argument or
>> possibly
>> > by
>> > > > > implementing a batch scaling algorithm internally.
>> > > > >
>> > > > > -Jason
>> > > > >
>> > > > >
>> > > > > On Mon, Jan 4, 2016 at 12:18 PM, Jason Gustafson <
>> jason@confluent.io
>> > >
>> > > > > wrote:
>> > > > >
>> > > > > > Hi Cliff,
>> > > > > >
>> > > > > > I think we're all agreed that the current contract of poll()
>> should
>> > > be
>> > > > > > kept. The consumer wouldn't wait for max messages to become
>> > available
>> > > > in
>> > > > > > this proposal; it would only sure that it never returns
more
>> than
>> > max
>> > > > > > messages.
>> > > > > >
>> > > > > > -Jason
>> > > > > >
>> > > > > > On Mon, Jan 4, 2016 at 11:52 AM, Cliff Rhyne <crhyne@signal.co>
>> > > wrote:
>> > > > > >
>> > > > > >> Instead of a heartbeat, I'd prefer poll() to return
whatever
>> > > messages
>> > > > > the
>> > > > > >> client has.  Either a) I don't care if I get less than
my max
>> > > message
>> > > > > >> limit
>> > > > > >> or b) I do care and will set a larger timeout.  Case
B is less
>> > > common
>> > > > > than
>> > > > > >> A and is fairly easy to handle in the application's
code.
>> > > > > >>
>> > > > > >> On Mon, Jan 4, 2016 at 1:47 PM, Gwen Shapira <
>> gwen@confluent.io>
>> > > > wrote:
>> > > > > >>
>> > > > > >> > 1. Agree that TCP window style scaling will be
cool. I'll
>> try to
>> > > > think
>> > > > > >> of a
>> > > > > >> > good excuse to use it ;)
>> > > > > >> >
>> > > > > >> > 2. I'm very concerned about the challenges of getting
the
>> > > timeouts,
>> > > > > >> > hearbeats and max messages right.
>> > > > > >> >
>> > > > > >> > Another option could be to expose "heartbeat" API
to
>> consumers.
>> > If
>> > > > my
>> > > > > >> app
>> > > > > >> > is still processing data but is still alive, it
could
>> initiate a
>> > > > > >> heartbeat
>> > > > > >> > to signal its alive without having to handle additional
>> > messages.
>> > > > > >> >
>> > > > > >> > I don't know if this improves more than it complicates
>> though :(
>> > > > > >> >
>> > > > > >> > On Mon, Jan 4, 2016 at 11:40 AM, Jason Gustafson
<
>> > > > jason@confluent.io>
>> > > > > >> > wrote:
>> > > > > >> >
>> > > > > >> > > Hey Gwen,
>> > > > > >> > >
>> > > > > >> > > I was thinking along the lines of TCP window
scaling in
>> order
>> > to
>> > > > > >> > > dynamically find a good consumption rate.
Basically you'd
>> > start
>> > > > off
>> > > > > >> > > consuming say 100 records and you'd let it
increase until
>> the
>> > > > > >> consumption
>> > > > > >> > > took longer than half the session timeout
(for example).
>> You
>> > > > /might/
>> > > > > >> be
>> > > > > >> > > able to achieve the same thing using pause/resume,
but it
>> > would
>> > > > be a
>> > > > > >> lot
>> > > > > >> > > trickier since you have to do it at the granularity
of
>> > > partitions.
>> > > > > But
>> > > > > >> > > yeah, database write performance doesn't always
scale in a
>> > > > > predictable
>> > > > > >> > > enough way to accommodate this, so I'm not
sure how useful
>> it
>> > > > would
>> > > > > >> be in
>> > > > > >> > > practice. It might also be more difficult
to implement
>> since
>> > it
>> > > > > >> wouldn't
>> > > > > >> > be
>> > > > > >> > > as clear when to initiate the next fetch.
With a static
>> > setting,
>> > > > the
>> > > > > >> > > consumer knows exactly how many records will
be returned on
>> > the
>> > > > next
>> > > > > >> call
>> > > > > >> > > to poll() and can send fetches accordingly.
>> > > > > >> > >
>> > > > > >> > > On the other hand, I do feel a little wary
of the need to
>> tune
>> > > the
>> > > > > >> > session
>> > > > > >> > > timeout and max messages though since these
settings might
>> > > depend
>> > > > on
>> > > > > >> the
>> > > > > >> > > environment that the consumer is deployed
in. It wouldn't
>> be a
>> > > big
>> > > > > >> deal
>> > > > > >> > if
>> > > > > >> > > the impact was relatively minor, but getting
them wrong can
>> > > cause
>> > > > a
>> > > > > >> lot
>> > > > > >> > of
>> > > > > >> > > rebalance churn which could keep the consumer
from making
>> any
>> > > > > >> progress.
>> > > > > >> > > It's not a particularly graceful failure.
>> > > > > >> > >
>> > > > > >> > > -Jason
>> > > > > >> > >
>> > > > > >> > > On Mon, Jan 4, 2016 at 10:49 AM, Gwen Shapira
<
>> > > gwen@confluent.io>
>> > > > > >> wrote:
>> > > > > >> > >
>> > > > > >> > > > I can't speak to all use-cases, but for
the database
>> one, I
>> > > > think
>> > > > > >> > > > pause-resume will be necessary in any
case, and therefore
>> > > > dynamic
>> > > > > >> batch
>> > > > > >> > > > sizes are not needed.
>> > > > > >> > > >
>> > > > > >> > > > Databases are really unexpected regarding
response times
>> -
>> > > load
>> > > > > and
>> > > > > >> > > locking
>> > > > > >> > > > can affect this. I'm not sure there's
a good way to know
>> you
>> > > are
>> > > > > >> going
>> > > > > >> > > into
>> > > > > >> > > > rebalance hell before it is too late.
So if I were
>> writing
>> > > code
>> > > > > that
>> > > > > >> > > > updates an RDBMS based on Kafka, I'd
pick a reasonable
>> batch
>> > > > size
>> > > > > >> (say
>> > > > > >> > > 5000
>> > > > > >> > > > records), and basically pause, batch-insert
all records,
>> > > commit
>> > > > > and
>> > > > > >> > > resume.
>> > > > > >> > > >
>> > > > > >> > > > Does that make sense?
>> > > > > >> > > >
>> > > > > >> > > > On Mon, Jan 4, 2016 at 10:37 AM, Jason
Gustafson <
>> > > > > >> jason@confluent.io>
>> > > > > >> > > > wrote:
>> > > > > >> > > >
>> > > > > >> > > > > Gwen and Ismael,
>> > > > > >> > > > >
>> > > > > >> > > > > I agree the configuration option
is probably the way to
>> > go,
>> > > > but
>> > > > > I
>> > > > > >> was
>> > > > > >> > > > > wondering whether there would be
cases where it made
>> sense
>> > > to
>> > > > > let
>> > > > > >> the
>> > > > > >> > > > > consumer dynamically set max messages
to adjust for
>> > > downstream
>> > > > > >> > > slowness.
>> > > > > >> > > > > For example, if the consumer is
writing consumed
>> records
>> > to
>> > > > > >> another
>> > > > > >> > > > > database, and that database is experiencing
heavier
>> than
>> > > > > expected
>> > > > > >> > load,
>> > > > > >> > > > > then the consumer could halve its
current max messages
>> in
>> > > > order
>> > > > > to
>> > > > > >> > > adapt
>> > > > > >> > > > > without risking rebalance hell.
It could then increase
>> max
>> > > > > >> messages
>> > > > > >> > as
>> > > > > >> > > > the
>> > > > > >> > > > > load on the database decreases.
It's basically an
>> easier
>> > way
>> > > > to
>> > > > > >> > handle
>> > > > > >> > > > flow
>> > > > > >> > > > > control than we provide with pause/resume.
>> > > > > >> > > > >
>> > > > > >> > > > > -Jason
>> > > > > >> > > > >
>> > > > > >> > > > > On Mon, Jan 4, 2016 at 9:46 AM,
Gwen Shapira <
>> > > > gwen@confluent.io
>> > > > > >
>> > > > > >> > > wrote:
>> > > > > >> > > > >
>> > > > > >> > > > > > The wiki you pointed to is
no longer maintained and
>> fell
>> > > out
>> > > > > of
>> > > > > >> > sync
>> > > > > >> > > > with
>> > > > > >> > > > > > the code and protocol.
>> > > > > >> > > > > >
>> > > > > >> > > > > > You may want  to refer to:
>> > > > > >> > > > > >
>> > > > > >> > > > > >
>> > > > > >> > > > >
>> > > > > >> > > >
>> > > > > >> > >
>> > > > > >> >
>> > > > > >>
>> > > > >
>> > > >
>> > >
>> >
>> https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol
>> > > > > >> > > > > >
>> > > > > >> > > > > > On Mon, Jan 4, 2016 at 4:38
AM, Jens Rantil <
>> > > > > >> jens.rantil@tink.se>
>> > > > > >> > > > wrote:
>> > > > > >> > > > > >
>> > > > > >> > > > > > > Hi guys,
>> > > > > >> > > > > > >
>> > > > > >> > > > > > > I realized I never thanked
yall for your input -
>> > thanks!
>> > > > > >> > > > > > > Jason: I apologize for
assuming your stance on the
>> > > issue!
>> > > > > >> Feels
>> > > > > >> > > like
>> > > > > >> > > > we
>> > > > > >> > > > > > all
>> > > > > >> > > > > > > agreed on the solution.
+1
>> > > > > >> > > > > > >
>> > > > > >> > > > > > > Follow-up: Jason made
a point about defining
>> prefetch
>> > > and
>> > > > > >> > fairness
>> > > > > >> > > > > > > behaviour in the KIP.
I am now working on putting
>> that
>> > > > down
>> > > > > in
>> > > > > >> > > > writing.
>> > > > > >> > > > > > To
>> > > > > >> > > > > > > do be able to do this
I think I need to understand
>> the
>> > > > > current
>> > > > > >> > > > prefetch
>> > > > > >> > > > > > > behaviour in the new consumer
API (0.9) a bit
>> better.
>> > > Some
>> > > > > >> > specific
>> > > > > >> > > > > > > questions:
>> > > > > >> > > > > > >
>> > > > > >> > > > > > >    - How does a specific
consumer balance incoming
>> > > > messages
>> > > > > >> from
>> > > > > >> > > > > multiple
>> > > > > >> > > > > > >    partitions? Is the
consumer simply issuing
>> > > Multi-Fetch
>> > > > > >> > > requests[1]
>> > > > > >> > > > > for
>> > > > > >> > > > > > > the
>> > > > > >> > > > > > >    consumed assigned partitions
of the relevant
>> > topics?
>> > > Or
>> > > > > is
>> > > > > >> the
>> > > > > >> > > > > > consumer
>> > > > > >> > > > > > >    fetching from one partition
at a time and
>> balancing
>> > > > > between
>> > > > > >> > them
>> > > > > >> > > > > > >    internally? That is,
is the responsibility of
>> > > partition
>> > > > > >> > > balancing
>> > > > > >> > > > > (and
>> > > > > >> > > > > > >    fairness) on the broker
side or consumer side?
>> > > > > >> > > > > > >    - Is the above documented
somewhere?
>> > > > > >> > > > > > >
>> > > > > >> > > > > > > [1]
>> > > > > >> > > > > > >
>> > > > > >> > > > > > >
>> > > > > >> > > > > >
>> > > > > >> > > > >
>> > > > > >> > > >
>> > > > > >> > >
>> > > > > >> >
>> > > > > >>
>> > > > >
>> > > >
>> > >
>> >
>> https://cwiki.apache.org/confluence/display/KAFKA/Writing+a+Driver+for+Kafka
>> > > > > >> > > > > > > ,
>> > > > > >> > > > > > > see "Multi-Fetch".
>> > > > > >> > > > > > >
>> > > > > >> > > > > > > Thanks,
>> > > > > >> > > > > > > Jens
>> > > > > >> > > > > > >
>> > > > > >> > > > > > > On Wed, Dec 23, 2015 at
2:44 AM, Ismael Juma <
>> > > > > >> ismael@juma.me.uk>
>> > > > > >> > > > > wrote:
>> > > > > >> > > > > > >
>> > > > > >> > > > > > > > On Wed, Dec 23, 2015
at 1:24 AM, Gwen Shapira <
>> > > > > >> > gwen@confluent.io
>> > > > > >> > > >
>> > > > > >> > > > > > wrote:
>> > > > > >> > > > > > > >
>> > > > > >> > > > > > > > > Given the background,
it sounds like you'll
>> > > generally
>> > > > > want
>> > > > > >> > each
>> > > > > >> > > > > call
>> > > > > >> > > > > > to
>> > > > > >> > > > > > > > > poll() to return
the same number of events
>> (which
>> > is
>> > > > the
>> > > > > >> > number
>> > > > > >> > > > you
>> > > > > >> > > > > > > > planned
>> > > > > >> > > > > > > > > on having enough
memory / time for). It also
>> > sounds
>> > > > like
>> > > > > >> > tuning
>> > > > > >> > > > the
>> > > > > >> > > > > > > > number
>> > > > > >> > > > > > > > > of events will
be closely tied to tuning the
>> > session
>> > > > > >> timeout.
>> > > > > >> > > > That
>> > > > > >> > > > > > is -
>> > > > > >> > > > > > > > if
>> > > > > >> > > > > > > > > I choose to
lower the session timeout for some
>> > > > reason, I
>> > > > > >> will
>> > > > > >> > > > have
>> > > > > >> > > > > to
>> > > > > >> > > > > > > > > modify the number
of records returning too.
>> > > > > >> > > > > > > > >
>> > > > > >> > > > > > > > > If those assumptions
are correct, I think a
>> > > > > configuration
>> > > > > >> > makes
>> > > > > >> > > > > more
>> > > > > >> > > > > > > > sense.
>> > > > > >> > > > > > > > > 1. We are unlikely
to want this parameter to be
>> > > change
>> > > > > at
>> > > > > >> the
>> > > > > >> > > > > > lifetime
>> > > > > >> > > > > > > of
>> > > > > >> > > > > > > > > the consumer
>> > > > > >> > > > > > > > > 2. The correct
value is tied to another
>> > > configuration
>> > > > > >> > > parameter,
>> > > > > >> > > > so
>> > > > > >> > > > > > > they
>> > > > > >> > > > > > > > > will be controlled
together.
>> > > > > >> > > > > > > > >
>> > > > > >> > > > > > > >
>> > > > > >> > > > > > > > I was thinking the
same thing.
>> > > > > >> > > > > > > >
>> > > > > >> > > > > > > > Ismael
>> > > > > >> > > > > > > >
>> > > > > >> > > > > > >
>> > > > > >> > > > > > >
>> > > > > >> > > > > > >
>> > > > > >> > > > > > > --
>> > > > > >> > > > > > > Jens Rantil
>> > > > > >> > > > > > > Backend engineer
>> > > > > >> > > > > > > Tink AB
>> > > > > >> > > > > > >
>> > > > > >> > > > > > > Email: jens.rantil@tink.se
>> > > > > >> > > > > > > Phone: +46 708 84 18 32
>> > > > > >> > > > > > > Web: www.tink.se
>> > > > > >> > > > > > >
>> > > > > >> > > > > > > Facebook <https://www.facebook.com/#!/tink.se>
>> > Linkedin
>> > > > > >> > > > > > > <
>> > > > > >> > > > > > >
>> > > > > >> > > > > >
>> > > > > >> > > > >
>> > > > > >> > > >
>> > > > > >> > >
>> > > > > >> >
>> > > > > >>
>> > > > >
>> > > >
>> > >
>> >
>> http://www.linkedin.com/company/2735919?trk=vsrp_companies_res_photo&trkInfo=VSRPsearchId%3A1057023381369207406670%2CVSRPtargetId%3A2735919%2CVSRPcmpt%3Aprimary
>> > > > > >> > > > > > > >
>> > > > > >> > > > > > >  Twitter <https://twitter.com/tink>
>> > > > > >> > > > > > >
>> > > > > >> > > > > >
>> > > > > >> > > > >
>> > > > > >> > > >
>> > > > > >> > >
>> > > > > >> >
>> > > > > >>
>> > > > > >>
>> > > > > >>
>> > > > > >> --
>> > > > > >> Cliff Rhyne
>> > > > > >> Software Engineering Lead
>> > > > > >> e: crhyne@signal.co
>> > > > > >> signal.co
>> > > > > >> ________________________
>> > > > > >>
>> > > > > >> Cut Through the Noise
>> > > > > >>
>> > > > > >> This e-mail and any files transmitted with it are for
the sole
>> use
>> > > of
>> > > > > the
>> > > > > >> intended recipient(s) and may contain confidential and
>> privileged
>> > > > > >> information. Any unauthorized use of this email is strictly
>> > > > prohibited.
>> > > > > >> ©2015 Signal. All rights reserved.
>> > > > > >>
>> > > > > >
>> > > > > >
>> > > > >
>> > > >
>> > >
>> >
>>
>>
>>
>> --
>> -- Guozhang
>>
>
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message