kafka-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Jason Gustafson <ja...@confluent.io>
Subject Re: [DISCUSS] scalability limits in the coordinator
Date Tue, 24 May 2016 00:08:25 GMT
>
> Jason, doesn't gzip (or other compression) basically do this? If the topic
> is a string and the topic is repeated throughout, won't compression
> basically replace all repeated instances of it with an index reference to
> the full string?


Hey James, yeah, that's probably true, but keep in mind that the
compression happens on the broker side. It would be nice to have a more
compact representation so that get some benefit over the wire as well. This
seems to be less of a concern here, so the bigger gains are probably from
reducing the number of partitions that need to be listed individually.

-Jason

On Mon, May 23, 2016 at 4:23 PM, Onur Karaman <onurkaraman.apache@gmail.com>
wrote:

> When figuring out these optimizations, it's worth keeping in mind the
> improvements when the message is uncompressed vs when it's compressed.
>
> When uncompressed:
> Fixing the Assignment serialization to instead be a topic index into the
> corresponding member's subscription list would usually be a good thing.
>
> I think the proposal is only worse when the topic names are small. The
> Type.STRING we use in our protocol for the assignment's TOPIC_KEY_NAME is
> limited in length to Short.MAX_VALUE, so our strings are first prepended
> with 2 bytes to indicate the string size.
>
> The new proposal does worse when:
> 2 + utf_encoded_string_payload_size < index_type_size
> in other words when:
> utf_encoded_string_payload_size < index_type_size - 2
>
> If the index type ends up being Type.INT32, then the proposal is worse when
> the topic is length 1.
> If the index type ends up being Type.INT64, then the proposal is worse when
> the topic is less than length 6.
>
> When compressed:
> As James Cheng brought up, I'm not sure how things change when compression
> comes into the picture. This would be worth investigating.
>
> On Mon, May 23, 2016 at 4:05 PM, James Cheng <wushujames@gmail.com> wrote:
>
> >
> > > On May 23, 2016, at 10:59 AM, Jason Gustafson <jason@confluent.io>
> > wrote:
> > >
> > > 2. Maybe there's a better way to lay out the assignment without needing
> > to
> > > explicitly repeat the topic? For example, the leader could sort the
> > topics
> > > for each member and just use an integer to represent the index of each
> > > topic within the sorted list (note this depends on the subscription
> > > including the full topic list).
> > >
> > > Assignment -> [TopicIndex [Partition]]
> > >
> >
> > Jason, doesn't gzip (or other compression) basically do this? If the
> topic
> > is a string and the topic is repeated throughout, won't compression
> > basically replace all repeated instances of it with an index reference to
> > the full string?
> >
> > -James
> >
> > > You could even combine these two options so that you have only 3
> integers
> > > for each topic assignment:
> > >
> > > Assignment -> [TopicIndex MinPartition MaxPartition]
> > >
> > > There may even be better options with a little more thought. All of
> this
> > is
> > > just part of the client-side protocol, so it wouldn't require any
> version
> > > bumps on the broker. What do you think?
> > >
> > > Thanks,
> > > Jason
> > >
> > >
> > >
> > >
> > > On Mon, May 23, 2016 at 9:17 AM, Guozhang Wang <wangguoz@gmail.com>
> > wrote:
> > >
> > >> The original concern is that regex may not be efficiently supported
> > >> across-languages, but if there is a neat workaround I would love to
> > learn.
> > >>
> > >> Guozhang
> > >>
> > >> On Mon, May 23, 2016 at 5:31 AM, Ismael Juma <ismael@juma.me.uk>
> wrote:
> > >>
> > >>> +1 to Jun's suggestion.
> > >>>
> > >>> Having said that, as a general point, I think we should consider
> > >> supporting
> > >>> topic patterns in the wire protocol. It requires some thinking for
> > >>> cross-language support, but it seems surmountable and it could make
> > >> certain
> > >>> operations a lot more efficient (the fact that a basic regex
> > subscription
> > >>> causes the consumer to request metadata for all topics is not great).
> > >>>
> > >>> Ismael
> > >>>
> > >>> On Sun, May 22, 2016 at 11:49 PM, Guozhang Wang <wangguoz@gmail.com>
> > >>> wrote:
> > >>>
> > >>>> I like Jun's suggestion in changing the handling logics of single
> > large
> > >>>> message on the consumer side.
> > >>>>
> > >>>> As for the case of "a single group subscribing to 3000 topics",
with
> > >> 100
> > >>>> consumers the 2.5Mb Gzip size is reasonable to me (when storing
in
> ZK,
> > >> we
> > >>>> also have the znode limit which is set to 1Mb by default, though
> > >>> admittedly
> > >>>> it is only for one consumer). And if we do the change as Jun
> > suggested,
> > >>>> 2.5Mb on follower's memory pressure is OK I think.
> > >>>>
> > >>>>
> > >>>> Guozhang
> > >>>>
> > >>>>
> > >>>> On Sat, May 21, 2016 at 12:51 PM, Onur Karaman <
> > >>>> onurkaraman.apache@gmail.com
> > >>>>> wrote:
> > >>>>
> > >>>>> Results without compression:
> > >>>>> 1 consumer 292383 bytes
> > >>>>> 5 consumers 1079579 bytes * the tipping point
> > >>>>> 10 consumers 1855018 bytes
> > >>>>> 20 consumers 2780220 bytes
> > >>>>> 30 consumers 3705422 bytes
> > >>>>> 40 consumers 4630624 bytes
> > >>>>> 50 consumers 5555826 bytes
> > >>>>> 60 consumers 6480788 bytes
> > >>>>> 70 consumers 7405750 bytes
> > >>>>> 80 consumers 8330712 bytes
> > >>>>> 90 consumers 9255674 bytes
> > >>>>> 100 consumers 10180636 bytes
> > >>>>>
> > >>>>> So it looks like gzip compression shrinks the message size
by 4x.
> > >>>>>
> > >>>>> On Sat, May 21, 2016 at 9:47 AM, Jun Rao <jun@confluent.io>
wrote:
> > >>>>>
> > >>>>>> Onur,
> > >>>>>>
> > >>>>>> Thanks for the investigation.
> > >>>>>>
> > >>>>>> Another option is to just fix how we deal with the case
when a
> > >>> message
> > >>>> is
> > >>>>>> larger than the fetch size. Today, if the fetch size is
smaller
> > >> than
> > >>>> the
> > >>>>>> fetch size, the consumer will get stuck. Instead, we can
simply
> > >>> return
> > >>>>> the
> > >>>>>> full message if it's larger than the fetch size w/o requiring
the
> > >>>>> consumer
> > >>>>>> to manually adjust the fetch size. On the broker side,
to serve a
> > >>> fetch
> > >>>>>> request, we already do an index lookup and then scan the
log a bit
> > >> to
> > >>>>> find
> > >>>>>> the message with the requested offset. We can just check
the size
> > >> of
> > >>>> that
> > >>>>>> message and return the full message if its size is larger
than the
> > >>>> fetch
> > >>>>>> size. This way, fetch size is really for performance optimization,
> > >>> i.e.
> > >>>>> in
> > >>>>>> the common case, we will not return more bytes than fetch
size,
> but
> > >>> if
> > >>>>>> there is a large message, we will return more bytes than
the
> > >>> specified
> > >>>>>> fetch size. In practice, large messages are rare. So, it
shouldn't
> > >>>>> increase
> > >>>>>> the memory consumption on the client too much.
> > >>>>>>
> > >>>>>> Jun
> > >>>>>>
> > >>>>>> On Sat, May 21, 2016 at 3:34 AM, Onur Karaman <
> > >>>>>> onurkaraman.apache@gmail.com>
> > >>>>>> wrote:
> > >>>>>>
> > >>>>>>> Hey everyone. So I started doing some tests on the
new
> > >>>>>> consumer/coordinator
> > >>>>>>> to see if it could handle more strenuous use cases
like mirroring
> > >>>>>> clusters
> > >>>>>>> with thousands of topics and thought I'd share whatever
I have so
> > >>>> far.
> > >>>>>>>
> > >>>>>>> The scalability limit: the amount of group metadata
we can fit
> > >> into
> > >>>> one
> > >>>>>>> message
> > >>>>>>>
> > >>>>>>> Some background:
> > >>>>>>> Client-side assignment is implemented in two phases
> > >>>>>>> 1. a PreparingRebalance phase that identifies members
of the
> > >> group
> > >>>> and
> > >>>>>>> aggregates member subscriptions.
> > >>>>>>> 2. an AwaitingSync phase that waits for the group leader
to
> > >> decide
> > >>>>> member
> > >>>>>>> assignments based on the member subscriptions across
the group.
> > >>>>>>>  - The leader announces this decision with a SyncGroupRequest.
> > >> The
> > >>>>>>> GroupCoordinator handles SyncGroupRequests by appending
all group
> > >>>> state
> > >>>>>>> into a single message under the __consumer_offsets
topic. This
> > >>>> message
> > >>>>> is
> > >>>>>>> keyed on the group id and contains each member subscription
as
> > >> well
> > >>>> as
> > >>>>>> the
> > >>>>>>> decided assignment for each member.
> > >>>>>>>
> > >>>>>>> The environment:
> > >>>>>>> - one broker
> > >>>>>>> - one __consumer_offsets partition
> > >>>>>>> - offsets.topic.compression.codec=1 // this is gzip
> > >>>>>>> - broker has my pending KAFKA-3718 patch that actually
makes use
> > >> of
> > >>>>>>> offsets.topic.compression.codec:
> > >>>>>> https://github.com/apache/kafka/pull/1394
> > >>>>>>> - around 3000 topics. This is an actual subset of topics
from one
> > >>> of
> > >>>>> our
> > >>>>>>> clusters.
> > >>>>>>> - topics have 8 partitions
> > >>>>>>> - topics are 25 characters long on average
> > >>>>>>> - one group with a varying number of consumers each
hardcoded
> > >> with
> > >>>> all
> > >>>>>> the
> > >>>>>>> topics just to make the tests more consistent. wildcarding
with
> > >> .*
> > >>>>> should
> > >>>>>>> have the same effect once the subscription hits the
coordinator
> > >> as
> > >>>> the
> > >>>>>>> subscription has already been fully expanded out to
the list of
> > >>>> topics
> > >>>>> by
> > >>>>>>> the consumers.
> > >>>>>>> - I added some log messages to Log.scala to print out
the message
> > >>>> sizes
> > >>>>>>> after compression
> > >>>>>>> - there are no producers at all and auto commits are
disabled.
> > >> The
> > >>>> only
> > >>>>>>> topic with messages getting added is the __consumer_offsets
topic
> > >>> and
> > >>>>>>> they're only from storing group metadata while processing
> > >>>>>>> SyncGroupRequests.
> > >>>>>>>
> > >>>>>>> Results:
> > >>>>>>> The results below show that we exceed the 1000012 byte
> > >>>>>>> KafkaConfig.messageMaxBytes limit relatively quickly
(between
> > >> 30-40
> > >>>>>>> consumers):
> > >>>>>>> 1 consumer 54739 bytes
> > >>>>>>> 5 consumers 261524 bytes
> > >>>>>>> 10 consumers 459804 bytes
> > >>>>>>> 20 consumers 702499 bytes
> > >>>>>>> 30 consumers 930525 bytes
> > >>>>>>> 40 consumers 1115657 bytes * the tipping point
> > >>>>>>> 50 consumers 1363112 bytes
> > >>>>>>> 60 consumers 1598621 bytes
> > >>>>>>> 70 consumers 1837359 bytes
> > >>>>>>> 80 consumers 2066934 bytes
> > >>>>>>> 90 consumers 2310970 bytes
> > >>>>>>> 100 consumers 2542735 bytes
> > >>>>>>>
> > >>>>>>> Note that the growth itself is pretty gradual. Plotting
the
> > >> points
> > >>>>> makes
> > >>>>>> it
> > >>>>>>> look roughly linear w.r.t the number of consumers:
> > >>>>>>>
> > >>>>>>>
> > >>>>>>
> > >>>>>
> > >>>>
> > >>>
> > >>
> >
> https://www.wolframalpha.com/input/?i=(1,+54739),+(5,+261524),+(10,+459804),+(20,+702499),+(30,+930525),+(40,+1115657),+(50,+1363112),+(60,+1598621),+(70,+1837359),+(80,+2066934),+(90,+2310970),+(100,+2542735)
> > >>>>>>>
> > >>>>>>> Also note that these numbers aren't averages or medians
or
> > >> anything
> > >>>>> like
> > >>>>>>> that. It's just the byte size from a given run. I did
run them a
> > >>> few
> > >>>>>> times
> > >>>>>>> and saw similar results.
> > >>>>>>>
> > >>>>>>> Impact:
> > >>>>>>> Even after adding gzip to the __consumer_offsets topic
with my
> > >>>> pending
> > >>>>>>> KAFKA-3718 patch, the AwaitingSync phase of the group
fails with
> > >>>>>>> RecordTooLargeException. This means the combined size
of each
> > >>>> member's
> > >>>>>>> subscriptions and assignments exceeded the
> > >>>> KafkaConfig.messageMaxBytes
> > >>>>> of
> > >>>>>>> 1000012 bytes. The group ends up dying.
> > >>>>>>>
> > >>>>>>> Options:
> > >>>>>>> 1. Config change: reduce the number of consumers in
the group.
> > >> This
> > >>>>> isn't
> > >>>>>>> always a realistic answer in more strenuous use cases
like
> > >>>> MirrorMaker
> > >>>>>>> clusters or for auditing.
> > >>>>>>> 2. Config change: split the group into smaller groups
which
> > >>> together
> > >>>>> will
> > >>>>>>> get full coverage of the topics. This gives each group
member a
> > >>>> smaller
> > >>>>>>> subscription.(ex: g1 has topics starting with a-m while
g2 has
> > >>> topics
> > >>>>>>> starting ith n-z). This would be operationally painful
to manage.
> > >>>>>>> 3. Config change: split the topics among members of
the group.
> > >>> Again
> > >>>>> this
> > >>>>>>> gives each group member a smaller subscription. This
would also
> > >> be
> > >>>>>>> operationally painful to manage.
> > >>>>>>> 4. Config change: bump up KafkaConfig.messageMaxBytes
(a
> > >>> topic-level
> > >>>>>>> config) and KafkaConfig.replicaFetchMaxBytes (a broker-level
> > >>> config).
> > >>>>>>> Applying messageMaxBytes to just the __consumer_offsets
topic
> > >> seems
> > >>>>>>> relatively harmless, but bumping up the broker-level
> > >>>>> replicaFetchMaxBytes
> > >>>>>>> would probably need more attention.
> > >>>>>>> 5. Config change: try different compression codecs.
Based on 2
> > >>>> minutes
> > >>>>> of
> > >>>>>>> googling, it seems like lz4 and snappy are faster than
gzip but
> > >>> have
> > >>>>>> worse
> > >>>>>>> compression, so this probably won't help.
> > >>>>>>> 6. Implementation change: support sending the regex
over the wire
> > >>>>> instead
> > >>>>>>> of the fully expanded topic subscriptions. I think
people said in
> > >>> the
> > >>>>>> past
> > >>>>>>> that different languages have subtle differences in
regex, so
> > >> this
> > >>>>>> doesn't
> > >>>>>>> play nicely with cross-language groups.
> > >>>>>>> 7. Implementation change: maybe we can reverse the
mapping?
> > >> Instead
> > >>>> of
> > >>>>>>> mapping from member to subscriptions, we can map a
subscription
> > >> to
> > >>> a
> > >>>>> list
> > >>>>>>> of members.
> > >>>>>>> 8. Implementation change: maybe we can try to break
apart the
> > >>>>>> subscription
> > >>>>>>> and assignments from the same SyncGroupRequest into
multiple
> > >>> records?
> > >>>>>> They
> > >>>>>>> can still go to the same message set and get appended
together.
> > >>> This
> > >>>>> way
> > >>>>>>> the limit become the segment size, which shouldn't
be a problem.
> > >>> This
> > >>>>> can
> > >>>>>>> be tricky to get right because we're currently keying
these
> > >>> messages
> > >>>> on
> > >>>>>> the
> > >>>>>>> group, so I think records from the same rebalance might
> > >>> accidentally
> > >>>>>>> compact one another, but my understanding of compaction
isn't
> > >> that
> > >>>>> great.
> > >>>>>>>
> > >>>>>>> Todo:
> > >>>>>>> It would be interesting to rerun the tests with no
compression
> > >> just
> > >>>> to
> > >>>>>> see
> > >>>>>>> how much gzip is helping but it's getting late. Maybe
tomorrow?
> > >>>>>>>
> > >>>>>>> - Onur
> > >>>>>>>
> > >>>>>>
> > >>>>>
> > >>>>
> > >>>>
> > >>>>
> > >>>> --
> > >>>> -- Guozhang
> > >>>>
> > >>>
> > >>
> > >>
> > >>
> > >> --
> > >> -- Guozhang
> > >>
> >
> >
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message