kafka-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Onur Karaman <onurkaraman.apa...@gmail.com>
Subject Re: [DISCUSS] scalability limits in the coordinator
Date Mon, 23 May 2016 23:23:10 GMT
When figuring out these optimizations, it's worth keeping in mind the
improvements when the message is uncompressed vs when it's compressed.

When uncompressed:
Fixing the Assignment serialization to instead be a topic index into the
corresponding member's subscription list would usually be a good thing.

I think the proposal is only worse when the topic names are small. The
Type.STRING we use in our protocol for the assignment's TOPIC_KEY_NAME is
limited in length to Short.MAX_VALUE, so our strings are first prepended
with 2 bytes to indicate the string size.

The new proposal does worse when:
2 + utf_encoded_string_payload_size < index_type_size
in other words when:
utf_encoded_string_payload_size < index_type_size - 2

If the index type ends up being Type.INT32, then the proposal is worse when
the topic is length 1.
If the index type ends up being Type.INT64, then the proposal is worse when
the topic is less than length 6.

When compressed:
As James Cheng brought up, I'm not sure how things change when compression
comes into the picture. This would be worth investigating.

On Mon, May 23, 2016 at 4:05 PM, James Cheng <wushujames@gmail.com> wrote:

>
> > On May 23, 2016, at 10:59 AM, Jason Gustafson <jason@confluent.io>
> wrote:
> >
> > 2. Maybe there's a better way to lay out the assignment without needing
> to
> > explicitly repeat the topic? For example, the leader could sort the
> topics
> > for each member and just use an integer to represent the index of each
> > topic within the sorted list (note this depends on the subscription
> > including the full topic list).
> >
> > Assignment -> [TopicIndex [Partition]]
> >
>
> Jason, doesn't gzip (or other compression) basically do this? If the topic
> is a string and the topic is repeated throughout, won't compression
> basically replace all repeated instances of it with an index reference to
> the full string?
>
> -James
>
> > You could even combine these two options so that you have only 3 integers
> > for each topic assignment:
> >
> > Assignment -> [TopicIndex MinPartition MaxPartition]
> >
> > There may even be better options with a little more thought. All of this
> is
> > just part of the client-side protocol, so it wouldn't require any version
> > bumps on the broker. What do you think?
> >
> > Thanks,
> > Jason
> >
> >
> >
> >
> > On Mon, May 23, 2016 at 9:17 AM, Guozhang Wang <wangguoz@gmail.com>
> wrote:
> >
> >> The original concern is that regex may not be efficiently supported
> >> across-languages, but if there is a neat workaround I would love to
> learn.
> >>
> >> Guozhang
> >>
> >> On Mon, May 23, 2016 at 5:31 AM, Ismael Juma <ismael@juma.me.uk> wrote:
> >>
> >>> +1 to Jun's suggestion.
> >>>
> >>> Having said that, as a general point, I think we should consider
> >> supporting
> >>> topic patterns in the wire protocol. It requires some thinking for
> >>> cross-language support, but it seems surmountable and it could make
> >> certain
> >>> operations a lot more efficient (the fact that a basic regex
> subscription
> >>> causes the consumer to request metadata for all topics is not great).
> >>>
> >>> Ismael
> >>>
> >>> On Sun, May 22, 2016 at 11:49 PM, Guozhang Wang <wangguoz@gmail.com>
> >>> wrote:
> >>>
> >>>> I like Jun's suggestion in changing the handling logics of single
> large
> >>>> message on the consumer side.
> >>>>
> >>>> As for the case of "a single group subscribing to 3000 topics", with
> >> 100
> >>>> consumers the 2.5Mb Gzip size is reasonable to me (when storing in ZK,
> >> we
> >>>> also have the znode limit which is set to 1Mb by default, though
> >>> admittedly
> >>>> it is only for one consumer). And if we do the change as Jun
> suggested,
> >>>> 2.5Mb on follower's memory pressure is OK I think.
> >>>>
> >>>>
> >>>> Guozhang
> >>>>
> >>>>
> >>>> On Sat, May 21, 2016 at 12:51 PM, Onur Karaman <
> >>>> onurkaraman.apache@gmail.com
> >>>>> wrote:
> >>>>
> >>>>> Results without compression:
> >>>>> 1 consumer 292383 bytes
> >>>>> 5 consumers 1079579 bytes * the tipping point
> >>>>> 10 consumers 1855018 bytes
> >>>>> 20 consumers 2780220 bytes
> >>>>> 30 consumers 3705422 bytes
> >>>>> 40 consumers 4630624 bytes
> >>>>> 50 consumers 5555826 bytes
> >>>>> 60 consumers 6480788 bytes
> >>>>> 70 consumers 7405750 bytes
> >>>>> 80 consumers 8330712 bytes
> >>>>> 90 consumers 9255674 bytes
> >>>>> 100 consumers 10180636 bytes
> >>>>>
> >>>>> So it looks like gzip compression shrinks the message size by 4x.
> >>>>>
> >>>>> On Sat, May 21, 2016 at 9:47 AM, Jun Rao <jun@confluent.io>
wrote:
> >>>>>
> >>>>>> Onur,
> >>>>>>
> >>>>>> Thanks for the investigation.
> >>>>>>
> >>>>>> Another option is to just fix how we deal with the case when
a
> >>> message
> >>>> is
> >>>>>> larger than the fetch size. Today, if the fetch size is smaller
> >> than
> >>>> the
> >>>>>> fetch size, the consumer will get stuck. Instead, we can simply
> >>> return
> >>>>> the
> >>>>>> full message if it's larger than the fetch size w/o requiring
the
> >>>>> consumer
> >>>>>> to manually adjust the fetch size. On the broker side, to serve
a
> >>> fetch
> >>>>>> request, we already do an index lookup and then scan the log
a bit
> >> to
> >>>>> find
> >>>>>> the message with the requested offset. We can just check the
size
> >> of
> >>>> that
> >>>>>> message and return the full message if its size is larger than
the
> >>>> fetch
> >>>>>> size. This way, fetch size is really for performance optimization,
> >>> i.e.
> >>>>> in
> >>>>>> the common case, we will not return more bytes than fetch size,
but
> >>> if
> >>>>>> there is a large message, we will return more bytes than the
> >>> specified
> >>>>>> fetch size. In practice, large messages are rare. So, it shouldn't
> >>>>> increase
> >>>>>> the memory consumption on the client too much.
> >>>>>>
> >>>>>> Jun
> >>>>>>
> >>>>>> On Sat, May 21, 2016 at 3:34 AM, Onur Karaman <
> >>>>>> onurkaraman.apache@gmail.com>
> >>>>>> wrote:
> >>>>>>
> >>>>>>> Hey everyone. So I started doing some tests on the new
> >>>>>> consumer/coordinator
> >>>>>>> to see if it could handle more strenuous use cases like
mirroring
> >>>>>> clusters
> >>>>>>> with thousands of topics and thought I'd share whatever
I have so
> >>>> far.
> >>>>>>>
> >>>>>>> The scalability limit: the amount of group metadata we can
fit
> >> into
> >>>> one
> >>>>>>> message
> >>>>>>>
> >>>>>>> Some background:
> >>>>>>> Client-side assignment is implemented in two phases
> >>>>>>> 1. a PreparingRebalance phase that identifies members of
the
> >> group
> >>>> and
> >>>>>>> aggregates member subscriptions.
> >>>>>>> 2. an AwaitingSync phase that waits for the group leader
to
> >> decide
> >>>>> member
> >>>>>>> assignments based on the member subscriptions across the
group.
> >>>>>>>  - The leader announces this decision with a SyncGroupRequest.
> >> The
> >>>>>>> GroupCoordinator handles SyncGroupRequests by appending
all group
> >>>> state
> >>>>>>> into a single message under the __consumer_offsets topic.
This
> >>>> message
> >>>>> is
> >>>>>>> keyed on the group id and contains each member subscription
as
> >> well
> >>>> as
> >>>>>> the
> >>>>>>> decided assignment for each member.
> >>>>>>>
> >>>>>>> The environment:
> >>>>>>> - one broker
> >>>>>>> - one __consumer_offsets partition
> >>>>>>> - offsets.topic.compression.codec=1 // this is gzip
> >>>>>>> - broker has my pending KAFKA-3718 patch that actually makes
use
> >> of
> >>>>>>> offsets.topic.compression.codec:
> >>>>>> https://github.com/apache/kafka/pull/1394
> >>>>>>> - around 3000 topics. This is an actual subset of topics
from one
> >>> of
> >>>>> our
> >>>>>>> clusters.
> >>>>>>> - topics have 8 partitions
> >>>>>>> - topics are 25 characters long on average
> >>>>>>> - one group with a varying number of consumers each hardcoded
> >> with
> >>>> all
> >>>>>> the
> >>>>>>> topics just to make the tests more consistent. wildcarding
with
> >> .*
> >>>>> should
> >>>>>>> have the same effect once the subscription hits the coordinator
> >> as
> >>>> the
> >>>>>>> subscription has already been fully expanded out to the
list of
> >>>> topics
> >>>>> by
> >>>>>>> the consumers.
> >>>>>>> - I added some log messages to Log.scala to print out the
message
> >>>> sizes
> >>>>>>> after compression
> >>>>>>> - there are no producers at all and auto commits are disabled.
> >> The
> >>>> only
> >>>>>>> topic with messages getting added is the __consumer_offsets
topic
> >>> and
> >>>>>>> they're only from storing group metadata while processing
> >>>>>>> SyncGroupRequests.
> >>>>>>>
> >>>>>>> Results:
> >>>>>>> The results below show that we exceed the 1000012 byte
> >>>>>>> KafkaConfig.messageMaxBytes limit relatively quickly (between
> >> 30-40
> >>>>>>> consumers):
> >>>>>>> 1 consumer 54739 bytes
> >>>>>>> 5 consumers 261524 bytes
> >>>>>>> 10 consumers 459804 bytes
> >>>>>>> 20 consumers 702499 bytes
> >>>>>>> 30 consumers 930525 bytes
> >>>>>>> 40 consumers 1115657 bytes * the tipping point
> >>>>>>> 50 consumers 1363112 bytes
> >>>>>>> 60 consumers 1598621 bytes
> >>>>>>> 70 consumers 1837359 bytes
> >>>>>>> 80 consumers 2066934 bytes
> >>>>>>> 90 consumers 2310970 bytes
> >>>>>>> 100 consumers 2542735 bytes
> >>>>>>>
> >>>>>>> Note that the growth itself is pretty gradual. Plotting
the
> >> points
> >>>>> makes
> >>>>>> it
> >>>>>>> look roughly linear w.r.t the number of consumers:
> >>>>>>>
> >>>>>>>
> >>>>>>
> >>>>>
> >>>>
> >>>
> >>
> https://www.wolframalpha.com/input/?i=(1,+54739),+(5,+261524),+(10,+459804),+(20,+702499),+(30,+930525),+(40,+1115657),+(50,+1363112),+(60,+1598621),+(70,+1837359),+(80,+2066934),+(90,+2310970),+(100,+2542735)
> >>>>>>>
> >>>>>>> Also note that these numbers aren't averages or medians
or
> >> anything
> >>>>> like
> >>>>>>> that. It's just the byte size from a given run. I did run
them a
> >>> few
> >>>>>> times
> >>>>>>> and saw similar results.
> >>>>>>>
> >>>>>>> Impact:
> >>>>>>> Even after adding gzip to the __consumer_offsets topic with
my
> >>>> pending
> >>>>>>> KAFKA-3718 patch, the AwaitingSync phase of the group fails
with
> >>>>>>> RecordTooLargeException. This means the combined size of
each
> >>>> member's
> >>>>>>> subscriptions and assignments exceeded the
> >>>> KafkaConfig.messageMaxBytes
> >>>>> of
> >>>>>>> 1000012 bytes. The group ends up dying.
> >>>>>>>
> >>>>>>> Options:
> >>>>>>> 1. Config change: reduce the number of consumers in the
group.
> >> This
> >>>>> isn't
> >>>>>>> always a realistic answer in more strenuous use cases like
> >>>> MirrorMaker
> >>>>>>> clusters or for auditing.
> >>>>>>> 2. Config change: split the group into smaller groups which
> >>> together
> >>>>> will
> >>>>>>> get full coverage of the topics. This gives each group member
a
> >>>> smaller
> >>>>>>> subscription.(ex: g1 has topics starting with a-m while
g2 has
> >>> topics
> >>>>>>> starting ith n-z). This would be operationally painful to
manage.
> >>>>>>> 3. Config change: split the topics among members of the
group.
> >>> Again
> >>>>> this
> >>>>>>> gives each group member a smaller subscription. This would
also
> >> be
> >>>>>>> operationally painful to manage.
> >>>>>>> 4. Config change: bump up KafkaConfig.messageMaxBytes (a
> >>> topic-level
> >>>>>>> config) and KafkaConfig.replicaFetchMaxBytes (a broker-level
> >>> config).
> >>>>>>> Applying messageMaxBytes to just the __consumer_offsets
topic
> >> seems
> >>>>>>> relatively harmless, but bumping up the broker-level
> >>>>> replicaFetchMaxBytes
> >>>>>>> would probably need more attention.
> >>>>>>> 5. Config change: try different compression codecs. Based
on 2
> >>>> minutes
> >>>>> of
> >>>>>>> googling, it seems like lz4 and snappy are faster than gzip
but
> >>> have
> >>>>>> worse
> >>>>>>> compression, so this probably won't help.
> >>>>>>> 6. Implementation change: support sending the regex over
the wire
> >>>>> instead
> >>>>>>> of the fully expanded topic subscriptions. I think people
said in
> >>> the
> >>>>>> past
> >>>>>>> that different languages have subtle differences in regex,
so
> >> this
> >>>>>> doesn't
> >>>>>>> play nicely with cross-language groups.
> >>>>>>> 7. Implementation change: maybe we can reverse the mapping?
> >> Instead
> >>>> of
> >>>>>>> mapping from member to subscriptions, we can map a subscription
> >> to
> >>> a
> >>>>> list
> >>>>>>> of members.
> >>>>>>> 8. Implementation change: maybe we can try to break apart
the
> >>>>>> subscription
> >>>>>>> and assignments from the same SyncGroupRequest into multiple
> >>> records?
> >>>>>> They
> >>>>>>> can still go to the same message set and get appended together.
> >>> This
> >>>>> way
> >>>>>>> the limit become the segment size, which shouldn't be a
problem.
> >>> This
> >>>>> can
> >>>>>>> be tricky to get right because we're currently keying these
> >>> messages
> >>>> on
> >>>>>> the
> >>>>>>> group, so I think records from the same rebalance might
> >>> accidentally
> >>>>>>> compact one another, but my understanding of compaction
isn't
> >> that
> >>>>> great.
> >>>>>>>
> >>>>>>> Todo:
> >>>>>>> It would be interesting to rerun the tests with no compression
> >> just
> >>>> to
> >>>>>> see
> >>>>>>> how much gzip is helping but it's getting late. Maybe tomorrow?
> >>>>>>>
> >>>>>>> - Onur
> >>>>>>>
> >>>>>>
> >>>>>
> >>>>
> >>>>
> >>>>
> >>>> --
> >>>> -- Guozhang
> >>>>
> >>>
> >>
> >>
> >>
> >> --
> >> -- Guozhang
> >>
>
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message