kafka-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Jason Gustafson <ja...@confluent.io>
Subject Re: [DISCUSS] scalability limits in the coordinator
Date Fri, 10 Jun 2016 18:29:14 GMT
Hey Becket,

My suggestion was pretty far from a completely thought-out proposal, but
the advantages of having your MM cluster maintain subscriptions/assignments
in its own topic are the following:

1. It solves the immediate problem of the size of the group metadata.
2. It distributes the subscription/assignment data among the brokers, so
the coordinator is not a bottleneck.
3. Subscriptions are fixed at startup, so maybe you only need to write them
once, which saves some network overhead on each rebalance. The leader is
currently sticky as well, so perhaps you could cache them and save lookup
overhead, which probably doesn't have much of an impact on memory since
subscriptions are typically the same for all consumers (except during
rolling update).

The disadvantage is that it's clearly more complex than letting the
coordinator do all the work. But it seems at this point like the other
solutions are more like workarounds.

-Jason




On Thu, Jun 9, 2016 at 10:52 PM, Becket Qin <becket.qin@gmail.com> wrote:

> Hi Jason,
>
> I am trying to understand the gain of saving the assignment and metadata in
> a topic and return the offsets to the consumers. This obviously saves
> memory footprint as we agreed before. But does it save network bandwidth?
> The consumers still need to read the same amount of data from the
> coordinator broker, right?
>
> In terms of the current proposal, Onur and I discussed offline. It looks
> that having a separate "offset.replica.fetch.max.bytes" does not buy us
> much. Users are likely setting it to be the same value as the max message
> size of the __consumer_offsets topic. The reason is this configuration is
> mainly for consumer side memory management, the users would not want to set
> it to be bigger than necessary.
>
> The solution we discussed is what Jun suggested, i.e. look at the size of
> the first message returned. If the size of the message is greater than the
> fetch max bytes, we always return the first message even it is bigger than
> max fetch size. Otherwise we only return up to fetch max bytes. We only do
> this for __consumer_offsets topic so no user topic will be impacted.
>
> Thanks,
>
> Jiangjie (Becket) Qin
>
> On Thu, Jun 9, 2016 at 2:40 PM, Jason Gustafson <jason@confluent.io>
> wrote:
>
> > Hi Onur,
> >
> > I didn't have a specific proposal in mind, I was just thinking
> analogously
> > with how Connect ensures task configurations are propagated to tasks
> > consistently when it rebalances the cluster. The high level concept is to
> > take the assignment data out of the rebalance protocol itself and replace
> > it with a pointer. For example, the pointer can be a (topic, partition,
> > offset) tuple which can be fetched separately by the consumer instance
> > after the rebalance completes. Then the leader would generate the
> > assignments, write them to Kafka, and send the pointers to the rest of
> the
> > group in the SyncGroup. This is more or less how Connect works.
> >
> > -Jason
> >
> > On Thu, Jun 9, 2016 at 11:10 AM, Onur Karaman
> > <okaraman@linkedin.com.invalid
> > > wrote:
> >
> > > I think the value of adding a "offsets.replica.fetch.max.bytes" config
> is
> > > that we don't break/change the meaning of "replica.fetch.max.bytes".
> > >
> > > We can also set "offsets.replica.fetch.max.bytes" to be a value safely
> > > larger than what we expect to ever allow the __consumer_offsets topic
> max
> > > message size to be without doing the larger change of bumping up the
> > global
> > > "replica.fetch.max.bytes".
> > >
> > > On Thu, Jun 9, 2016 at 10:40 AM, Becket Qin <becket.qin@gmail.com>
> > wrote:
> > >
> > > > I think taking bigger one of the fetch size and message size limit is
> > > > probably good enough. If we have a separate
> > > > "offset.replica.fetch.max.bytes", I guess the value will always be
> set
> > to
> > > > max message size of the __consumer_offsets topic, which does not seem
> > to
> > > > have much value.
> > > >
> > > > On Thu, Jun 9, 2016 at 3:15 AM, Onur Karaman
> > > <okaraman@linkedin.com.invalid
> > > > >
> > > > wrote:
> > > >
> > > > > Maybe another approach can be to add a new
> > > > > "offsets.replica.fetch.max.bytes" config on the brokers.
> > > > >
> > > > > On Thu, Jun 9, 2016 at 3:03 AM, Onur Karaman <
> okaraman@linkedin.com>
> > > > > wrote:
> > > > >
> > > > > > I made a PR with a tweak to Jun's/Becket's proposal:
> > > > > > https://github.com/apache/kafka/pull/1484
> > > > > >
> > > > > > It just tweaks the fetch behavior specifically for replicas
> > fetching
> > > > from
> > > > > > the __consumer_offsets topic when the fetcher's
> > > > "replica.fetch.max.bytes"
> > > > > > is less than the __consumer_offset leader's "message.max.bytes"
> to
> > > take
> > > > > the
> > > > > > max of the two.
> > > > > >
> > > > > > I'm honestly not that happy with this solution, as I'd rather not
> > > > change
> > > > > > the "replica.fetch.max.bytes" config from being a limit to a
> > > > > > recommendation. I'd definitely be happy to hear other
> alternatives!
> > > > > >
> > > > > > On Sun, May 29, 2016 at 1:57 PM, Onur Karaman <
> > > > > > onurkaraman.apache@gmail.com> wrote:
> > > > > >
> > > > > >> Sorry I know next to nothing about Kafka Connect. I didn't
> > > understand
> > > > > the
> > > > > >> Kafka Connect / MM idea you brought up. Can you go into more
> > detail?
> > > > > >>
> > > > > >> Otherwise I think our remaining options are:
> > > > > >> - Jun's suggestion to bump up the KafkaConfig.messageMaxBytes
> for
> > > > > >> __consumer_offsets topic and change the fetch behavior when
> > message
> > > > size
> > > > > >> is
> > > > > >> larger than fetch size
> > > > > >> - option 6: support sending the regex over the wire instead of
> the
> > > > fully
> > > > > >> expanded topic subscriptions. This should cut down the message
> > size
> > > > from
> > > > > >> the subscription side. Again this only helps when pattern-based
> > > > > >> subscriptions are done.
> > > > > >>
> > > > > >> minor correction to an earlier comment I made regarding the
> > message
> > > > > size:
> > > > > >> message size ~ sum(s_i + a_i for i in range [1, |C|])
> > > > > >>
> > > > > >> On Thu, May 26, 2016 at 3:35 PM, Jason Gustafson <
> > > jason@confluent.io>
> > > > > >> wrote:
> > > > > >>
> > > > > >> > Hey Onur,
> > > > > >> >
> > > > > >> > Thanks for the investigation. It seems the conclusion is that
> > the
> > > > > >> compact
> > > > > >> > format helps, but perhaps not enough to justify adding a new
> > > > > assignment
> > > > > >> > schema? I'm not sure there's much more room for savings unless
> > we
> > > > > change
> > > > > >> > something more fundamental in the assignment approach. We
> spent
> > > some
> > > > > >> time
> > > > > >> > thinking before about whether we could let the consumers
> compute
> > > > their
> > > > > >> > assignment locally from a smaller set of information, but the
> > > > > difficulty
> > > > > >> > (I'm sure you remember) is reaching consensus on topic
> metadata.
> > > > Kafka
> > > > > >> > Connect has a similar problem where all the workers need to
> > agree
> > > on
> > > > > >> > connector configurations. Since all configs are stored in a
> > single
> > > > > topic
> > > > > >> > partition, the approach we take there is to propagate the
> offset
> > > in
> > > > > the
> > > > > >> > assignment protocol. Not sure if we could do something similar
> > for
> > > > > MM...
> > > > > >> > Anyway, it seems like the best workaround at the moment is
> Jun's
> > > > > initial
> > > > > >> > suggestion. What do you think?
> > > > > >> >
> > > > > >> > -Jason
> > > > > >> >
> > > > > >> > On Wed, May 25, 2016 at 10:47 PM, Onur Karaman <
> > > > > >> > onurkaraman.apache@gmail.com
> > > > > >> > > wrote:
> > > > > >> >
> > > > > >> > > I gave the topic index assignment trick a try against the
> same
> > > > > >> > environment.
> > > > > >> > > The implementation just changed the assignment serialization
> > and
> > > > > >> > > deserialization logic. It didn't change SyncGroupResponse,
> > > meaning
> > > > > it
> > > > > >> > > continues to exclude the subscription from the
> > SyncGroupResponse
> > > > and
> > > > > >> > > assumes the member has kept track of its last subscription.
> > > > > >> > >
> > > > > >> > > Assignment topic indexing with compression:
> > > > > >> > > 1 consumer 34346 bytes
> > > > > >> > > 5 consumers 177687 bytes
> > > > > >> > > 10 consumers 331897 bytes
> > > > > >> > > 20 consumers 572467 bytes
> > > > > >> > > 30 consumers 811269 bytes
> > > > > >> > > 40 consumers 1047188 bytes * the tipping point
> > > > > >> > > 50 consumers 1290092 bytes
> > > > > >> > > 60 consumers 1527806 bytes
> > > > > >> > > 70 consumers 1769259 bytes
> > > > > >> > > 80 consumers 2000118 bytes
> > > > > >> > > 90 consumers 2244392 bytes
> > > > > >> > > 100 consumers 2482415 bytes
> > > > > >> > >
> > > > > >> > > Assignment topic indexing without compression:
> > > > > >> > > 1 consumer 211904 bytes
> > > > > >> > > 5 consumers 677184 bytes
> > > > > >> > > 10 consumers 1211154 bytes * the tipping point
> > > > > >> > > 20 consumers 2136196 bytes
> > > > > >> > > 30 consumers 3061238 bytes
> > > > > >> > > 40 consumers 3986280 bytes
> > > > > >> > > 50 consumers 4911322 bytes
> > > > > >> > > 60 consumers 5836284 bytes
> > > > > >> > > 70 consumers 6761246 bytes
> > > > > >> > > 80 consumers 7686208 bytes
> > > > > >> > > 90 consumers 8611170 bytes
> > > > > >> > > 100 consumers 9536132 bytes
> > > > > >> > >
> > > > > >> > > Assignment topic indexing seems to reduce the size by 500KB
> > > > without
> > > > > >> > > compression and 80KB with compression. So assignment topic
> > > > indexing
> > > > > >> makes
> > > > > >> > > some difference in both with and without compression but in
> > our
> > > > case
> > > > > >> was
> > > > > >> > > not nearly enough.
> > > > > >> > >
> > > > > >> > > This can be explained by the fact that we aren't actually
> > > hitting
> > > > > the
> > > > > >> > worst
> > > > > >> > > case scenario of each consumer being assigned a partition
> from
> > > > every
> > > > > >> > topic.
> > > > > >> > > The reason is simple: a topic can only fully span all the
> > > > consumers
> > > > > >> if it
> > > > > >> > > has at least as many partitions as there are consumers.
> Given
> > > that
> > > > > >> there
> > > > > >> > > are 8 partitions per topic and we have 100 consumers, it
> makes
> > > > sense
> > > > > >> that
> > > > > >> > > we aren't close to this worse case scenario where topic
> > indexing
> > > > > would
> > > > > >> > make
> > > > > >> > > a bigger difference.
> > > > > >> > >
> > > > > >> > > I tweaked the group leader's assignment code to print out
> the
> > > > > >> assignments
> > > > > >> > > and found that each consumer was getting either 238 or 239
> > > > > partitions.
> > > > > >> > Each
> > > > > >> > > of these partitions were from unique topics. So the
> consumers
> > > were
> > > > > >> really
> > > > > >> > > getting partitions from 239 topics instead of the full worst
> > > case
> > > > > >> > scenario
> > > > > >> > > of 3000 topics.
> > > > > >> > >
> > > > > >> > > On Wed, May 25, 2016 at 1:42 PM, Jason Gustafson <
> > > > > jason@confluent.io>
> > > > > >> > > wrote:
> > > > > >> > >
> > > > > >> > > > Gwen, Joel:
> > > > > >> > > >
> > > > > >> > > > That's correct. The protocol does allow us to give an
> > assignor
> > > > its
> > > > > >> own
> > > > > >> > > > assignment schema, but I think this will require a couple
> > > > internal
> > > > > >> > > changes
> > > > > >> > > > to the consumer to make use of the full generality.
> > > > > >> > > >
> > > > > >> > > > One thing I'm a little uncertain about is whether we
> should
> > > use
> > > > a
> > > > > >> > > different
> > > > > >> > > > protocol type. For a little context, the group membership
> > > > protocol
> > > > > >> > allows
> > > > > >> > > > the client to provide a "protocol type" when joining the
> > group
> > > > to
> > > > > >> > ensure
> > > > > >> > > > that all members have some basic semantic compatibility.
> For
> > > > > >> example,
> > > > > >> > the
> > > > > >> > > > consumer uses "consumer" and Kafka Connect uses "connect."
> > > > > Currently
> > > > > >> > all
> > > > > >> > > > assignors using the "consumer" protocol share a common
> > schema
> > > > for
> > > > > >> > > > representing subscriptions and assignment. This is
> > convenient
> > > > for
> > > > > >> tools
> > > > > >> > > > (like consumer-groups.sh) since they just need to know how
> > to
> > > > > parse
> > > > > >> the
> > > > > >> > > > "consumer" protocol type without knowing anything about
> the
> > > > > >> assignors.
> > > > > >> > So
> > > > > >> > > > introducing another schema would break that assumption and
> > > we'd
> > > > > need
> > > > > >> > > those
> > > > > >> > > > tools to do assignor-specific parsing. Maybe this is OK?
> > > > > >> Alternatively,
> > > > > >> > > we
> > > > > >> > > > could use a separate protocol type (e.g.
> > "compact-consumer"),
> > > > but
> > > > > >> that
> > > > > >> > > > seems less than desirable.
> > > > > >> > > >
> > > > > >> > > > Thanks,
> > > > > >> > > > Jason
> > > > > >> > > >
> > > > > >> > > > On Wed, May 25, 2016 at 11:00 AM, Gwen Shapira <
> > > > gwen@confluent.io
> > > > > >
> > > > > >> > > wrote:
> > > > > >> > > >
> > > > > >> > > > > ah, right - we can add as many strategies as we want.
> > > > > >> > > > >
> > > > > >> > > > > On Wed, May 25, 2016 at 10:54 AM, Joel Koshy <
> > > > > jjkoshy.w@gmail.com
> > > > > >> >
> > > > > >> > > > wrote:
> > > > > >> > > > >
> > > > > >> > > > > > > Yes it would be a protocol bump.
> > > > > >> > > > > > >
> > > > > >> > > > > >
> > > > > >> > > > > > Sorry - I'm officially confused. I think it may not be
> > > > > required
> > > > > >> -
> > > > > >> > > since
> > > > > >> > > > > the
> > > > > >> > > > > > more compact format would be associated with a new
> > > > assignment
> > > > > >> > > strategy
> > > > > >> > > > -
> > > > > >> > > > > > right?
> > > > > >> > > > > >
> > > > > >> > > > > >
> > > > > >> > > > > > > smaller than the plaintext PAL, but the
> > post-compressed
> > > > > binary
> > > > > >> > PAL
> > > > > >> > > is
> > > > > >> > > > > > just
> > > > > >> > > > > > > 25% smaller than the post-compressed plaintext PAL.
> > IOW
> > > > > using
> > > > > >> a
> > > > > >> > > > symbol
> > > > > >> > > > > > > table helps a lot but further compression on that
> > > already
> > > > > >> compact
> > > > > >> > > > > format
> > > > > >> > > > > > > would yield only marginal return.
> > > > > >> > > > > > >
> > > > > >> > > > > >
> > > > > >> > > > > > > So basically I feel we could get pretty far with a
> > more
> > > > > >> compact
> > > > > >> > > field
> > > > > >> > > > > > > format for assignment and if we do that then we
> would
> > > > > >> potentially
> > > > > >> > > not
> > > > > >> > > > > > even
> > > > > >> > > > > > > want to do any compression.
> > > > > >> > > > > > >
> > > > > >> > > > > >
> > > > > >> > > > > > Also just wanted to add that this compression on the
> > > binary
> > > > > PAL
> > > > > >> did
> > > > > >> > > > help
> > > > > >> > > > > > but the compression ratio was obviously not as high as
> > > > > plaintext
> > > > > >> > > > > > compression.
> > > > > >> > > > > >
> > > > > >> > > > > >
> > > > > >> > > > > > >
> > > > > >> > > > > > > On Tue, May 24, 2016 at 4:19 PM, Gwen Shapira <
> > > > > >> gwen@confluent.io
> > > > > >> > >
> > > > > >> > > > > wrote:
> > > > > >> > > > > > >
> > > > > >> > > > > > >> Regarding the change to the assignment field. It
> > would
> > > > be a
> > > > > >> > > protocol
> > > > > >> > > > > > bump,
> > > > > >> > > > > > >> otherwise consumers will not know how to parse the
> > > bytes
> > > > > the
> > > > > >> > > broker
> > > > > >> > > > is
> > > > > >> > > > > > >> returning, right?
> > > > > >> > > > > > >> Or did I misunderstand the suggestion?
> > > > > >> > > > > > >>
> > > > > >> > > > > > >> On Tue, May 24, 2016 at 2:52 PM, Guozhang Wang <
> > > > > >> > > wangguoz@gmail.com>
> > > > > >> > > > > > >> wrote:
> > > > > >> > > > > > >>
> > > > > >> > > > > > >> > I think for just solving issue 1), Jun's
> suggestion
> > > is
> > > > > >> > > sufficient
> > > > > >> > > > > and
> > > > > >> > > > > > >> > simple. So I'd prefer that approach.
> > > > > >> > > > > > >> >
> > > > > >> > > > > > >> > In addition, Jason's optimization on the
> assignment
> > > > field
> > > > > >> > would
> > > > > >> > > be
> > > > > >> > > > > > good
> > > > > >> > > > > > >> for
> > > > > >> > > > > > >> > 2) and 3) as well, and I like that optimization
> for
> > > its
> > > > > >> > > simplicity
> > > > > >> > > > > and
> > > > > >> > > > > > >> no
> > > > > >> > > > > > >> > format change as well. And in the future I'm in
> > favor
> > > > of
> > > > > >> > > > considering
> > > > > >> > > > > > to
> > > > > >> > > > > > >> > change the in-memory cache format as Jiangjie
> > > > suggested.
> > > > > >> > > > > > >> >
> > > > > >> > > > > > >> > Guozhang
> > > > > >> > > > > > >> >
> > > > > >> > > > > > >> >
> > > > > >> > > > > > >> > On Tue, May 24, 2016 at 12:42 PM, Becket Qin <
> > > > > >> > > > becket.qin@gmail.com>
> > > > > >> > > > > > >> wrote:
> > > > > >> > > > > > >> >
> > > > > >> > > > > > >> > > Hi Jason,
> > > > > >> > > > > > >> > >
> > > > > >> > > > > > >> > > There are a few problems we want to solve here:
> > > > > >> > > > > > >> > > 1. The group metadata is too big to be appended
> > to
> > > > the
> > > > > >> log.
> > > > > >> > > > > > >> > > 2. Reduce the memory footprint on the broker
> > > > > >> > > > > > >> > > 3. Reduce the bytes transferred over the wire.
> > > > > >> > > > > > >> > >
> > > > > >> > > > > > >> > > To solve (1), I like your idea of having
> separate
> > > > > >> messages
> > > > > >> > per
> > > > > >> > > > > > member.
> > > > > >> > > > > > >> > The
> > > > > >> > > > > > >> > > proposal (Onur's option 8) is to break metadata
> > > into
> > > > > >> small
> > > > > >> > > > records
> > > > > >> > > > > > in
> > > > > >> > > > > > >> the
> > > > > >> > > > > > >> > > same uncompressed message set so each record is
> > > > small.
> > > > > I
> > > > > >> > agree
> > > > > >> > > > it
> > > > > >> > > > > > >> would
> > > > > >> > > > > > >> > be
> > > > > >> > > > > > >> > > ideal if we are able to store the metadata
> > > separately
> > > > > for
> > > > > >> > each
> > > > > >> > > > > > >> member. I
> > > > > >> > > > > > >> > > was also thinking about storing the metadata
> into
> > > > > >> multiple
> > > > > >> > > > > messages,
> > > > > >> > > > > > >> too.
> > > > > >> > > > > > >> > > What concerns me was that having multiple
> > messages
> > > > > seems
> > > > > >> > > > breaking
> > > > > >> > > > > > the
> > > > > >> > > > > > >> > > atomicity. I am not sure how we are going to
> deal
> > > > with
> > > > > >> the
> > > > > >> > > > > potential
> > > > > >> > > > > > >> > > issues. For example, What if group metadata is
> > > > > replicated
> > > > > >> > but
> > > > > >> > > > the
> > > > > >> > > > > > >> member
> > > > > >> > > > > > >> > > metadata is not? It might be fine depending on
> > the
> > > > > >> > > > implementation
> > > > > >> > > > > > >> though,
> > > > > >> > > > > > >> > > but I am not sure.
> > > > > >> > > > > > >> > >
> > > > > >> > > > > > >> > > For (2) we want to store the metadata onto the
> > > disk,
> > > > > >> which
> > > > > >> > is
> > > > > >> > > > what
> > > > > >> > > > > > we
> > > > > >> > > > > > >> > have
> > > > > >> > > > > > >> > > to do anyway. The only question is in what
> format
> > > > > should
> > > > > >> we
> > > > > >> > > > store
> > > > > >> > > > > > >> them.
> > > > > >> > > > > > >> > >
> > > > > >> > > > > > >> > > To address (3) we want to have the metadata to
> be
> > > > > >> > compressed,
> > > > > >> > > > > which
> > > > > >> > > > > > is
> > > > > >> > > > > > >> > > contradict to the the above solution of (1).
> > > > > >> > > > > > >> > >
> > > > > >> > > > > > >> > > I think Jun's suggestion is probably still the
> > > > > simplest.
> > > > > >> To
> > > > > >> > > > avoid
> > > > > >> > > > > > >> > changing
> > > > > >> > > > > > >> > > the behavior for consumers, maybe we can do
> that
> > > only
> > > > > for
> > > > > >> > > > > > >> offset_topic,
> > > > > >> > > > > > >> > > i.e, if the max fetch bytes of the fetch
> request
> > is
> > > > > >> smaller
> > > > > >> > > than
> > > > > >> > > > > the
> > > > > >> > > > > > >> > > message size on the offset topic, we always
> > return
> > > at
> > > > > >> least
> > > > > >> > > one
> > > > > >> > > > > full
> > > > > >> > > > > > >> > > message. This should avoid the unexpected
> problem
> > > on
> > > > > the
> > > > > >> > > client
> > > > > >> > > > > side
> > > > > >> > > > > > >> > > because supposedly only tools and brokers will
> > > fetch
> > > > > from
> > > > > >> > the
> > > > > >> > > > the
> > > > > >> > > > > > >> > internal
> > > > > >> > > > > > >> > > topics,
> > > > > >> > > > > > >> > >
> > > > > >> > > > > > >> > > As a modification to what you suggested, one
> > > > solution I
> > > > > >> was
> > > > > >> > > > > thinking
> > > > > >> > > > > > >> was
> > > > > >> > > > > > >> > to
> > > > > >> > > > > > >> > > have multiple messages in a single compressed
> > > > message.
> > > > > >> That
> > > > > >> > > > means
> > > > > >> > > > > > for
> > > > > >> > > > > > >> > > SyncGroupResponse we still need to read the
> > entire
> > > > > >> > compressed
> > > > > >> > > > > > messages
> > > > > >> > > > > > >> > and
> > > > > >> > > > > > >> > > extract the inner messages, which seems not
> quite
> > > > > >> different
> > > > > >> > > from
> > > > > >> > > > > > >> having a
> > > > > >> > > > > > >> > > single message containing everything. But let
> me
> > > just
> > > > > >> put it
> > > > > >> > > > here
> > > > > >> > > > > > and
> > > > > >> > > > > > >> see
> > > > > >> > > > > > >> > > if that makes sense.
> > > > > >> > > > > > >> > >
> > > > > >> > > > > > >> > > We can have a map of GroupMetadataKey ->
> > > > > >> > > > GroupMetadataValueOffset.
> > > > > >> > > > > > >> > >
> > > > > >> > > > > > >> > > The GroupMetadataValue is stored in a
> compressed
> > > > > message.
> > > > > >> > The
> > > > > >> > > > > inner
> > > > > >> > > > > > >> > > messages are the following:
> > > > > >> > > > > > >> > >
> > > > > >> > > > > > >> > > Inner Message 0: Version GroupId Generation
> > > > > >> > > > > > >> > >
> > > > > >> > > > > > >> > > Inner Message 1: MemberId MemberMetadata_1 (we
> > can
> > > > > >> compress
> > > > > >> > > the
> > > > > >> > > > > > bytes
> > > > > >> > > > > > >> > here)
> > > > > >> > > > > > >> > >
> > > > > >> > > > > > >> > > Inner Message 2: MemberId MemberMetadata_2
> > > > > >> > > > > > >> > > ....
> > > > > >> > > > > > >> > > Inner Message N: MemberId MemberMetadata_N
> > > > > >> > > > > > >> > >
> > > > > >> > > > > > >> > > The MemberMetadata format is the following:
> > > > > >> > > > > > >> > >   MemberMetadata => Version Generation ClientId
> > > Host
> > > > > >> > > > Subscription
> > > > > >> > > > > > >> > > Assignment
> > > > > >> > > > > > >> > >
> > > > > >> > > > > > >> > > So DescribeGroupResponse will just return the
> > > entire
> > > > > >> > > compressed
> > > > > >> > > > > > >> > > GroupMetadataMessage. SyncGroupResponse will
> > return
> > > > the
> > > > > >> > > > > > corresponding
> > > > > >> > > > > > >> > inner
> > > > > >> > > > > > >> > > message.
> > > > > >> > > > > > >> > >
> > > > > >> > > > > > >> > > Thanks,
> > > > > >> > > > > > >> > >
> > > > > >> > > > > > >> > > Jiangjie (Becket) Qin
> > > > > >> > > > > > >> > >
> > > > > >> > > > > > >> > >
> > > > > >> > > > > > >> > >
> > > > > >> > > > > > >> > > On Tue, May 24, 2016 at 9:14 AM, Jason
> Gustafson
> > <
> > > > > >> > > > > > jason@confluent.io>
> > > > > >> > > > > > >> > > wrote:
> > > > > >> > > > > > >> > >
> > > > > >> > > > > > >> > > > Hey Becket,
> > > > > >> > > > > > >> > > >
> > > > > >> > > > > > >> > > > I like your idea to store only the offset for
> > the
> > > > > group
> > > > > >> > > > metadata
> > > > > >> > > > > > in
> > > > > >> > > > > > >> > > memory.
> > > > > >> > > > > > >> > > > I think it would be safe to keep it in memory
> > > for a
> > > > > >> short
> > > > > >> > > time
> > > > > >> > > > > > after
> > > > > >> > > > > > >> > the
> > > > > >> > > > > > >> > > > rebalance completes, but after that, it's
> only
> > > real
> > > > > >> > purpose
> > > > > >> > > is
> > > > > >> > > > > to
> > > > > >> > > > > > >> > answer
> > > > > >> > > > > > >> > > > DescribeGroup requests, so your proposal
> makes
> > a
> > > > lot
> > > > > of
> > > > > >> > > sense
> > > > > >> > > > to
> > > > > >> > > > > > me.
> > > > > >> > > > > > >> > > >
> > > > > >> > > > > > >> > > > As for the specific problem with the size of
> > the
> > > > > group
> > > > > >> > > > metadata
> > > > > >> > > > > > >> message
> > > > > >> > > > > > >> > > for
> > > > > >> > > > > > >> > > > the MM case, if we cannot succeed in reducing
> > the
> > > > > size
> > > > > >> of
> > > > > >> > > the
> > > > > >> > > > > > >> > > > subscription/assignment (which I think is
> still
> > > > > >> probably
> > > > > >> > the
> > > > > >> > > > > best
> > > > > >> > > > > > >> > > > alternative if it can work), then I think
> there
> > > are
> > > > > >> some
> > > > > >> > > > options
> > > > > >> > > > > > for
> > > > > >> > > > > > >> > > > changing the message format (option #8 in
> > Onur's
> > > > > >> initial
> > > > > >> > > > > e-mail).
> > > > > >> > > > > > >> > > > Currently, the key used for storing the group
> > > > > metadata
> > > > > >> is
> > > > > >> > > > this:
> > > > > >> > > > > > >> > > >
> > > > > >> > > > > > >> > > > GroupMetadataKey => Version GroupId
> > > > > >> > > > > > >> > > >
> > > > > >> > > > > > >> > > > And the value is something like this (some
> > > details
> > > > > >> > elided):
> > > > > >> > > > > > >> > > >
> > > > > >> > > > > > >> > > > GroupMetadataValue => Version GroupId
> > Generation
> > > > > >> > > > > [MemberMetadata]
> > > > > >> > > > > > >> > > >   MemberMetadata => ClientId Host
> Subscription
> > > > > >> Assignment
> > > > > >> > > > > > >> > > >
> > > > > >> > > > > > >> > > > I don't think we can change the key without a
> > lot
> > > > of
> > > > > >> pain,
> > > > > >> > > but
> > > > > >> > > > > it
> > > > > >> > > > > > >> seems
> > > > > >> > > > > > >> > > > like we can change the value format. Maybe we
> > can
> > > > > take
> > > > > >> the
> > > > > >> > > > > > >> > > > subscription/assignment payloads out of the
> > value
> > > > and
> > > > > >> > > > introduce
> > > > > >> > > > > a
> > > > > >> > > > > > >> new
> > > > > >> > > > > > >> > > > "MemberMetadata" message for each member in
> the
> > > > > group.
> > > > > >> For
> > > > > >> > > > > > example:
> > > > > >> > > > > > >> > > >
> > > > > >> > > > > > >> > > > MemberMetadataKey => Version GroupId MemberId
> > > > > >> > > > > > >> > > >
> > > > > >> > > > > > >> > > > MemberMetadataValue => Version Generation
> > > ClientId
> > > > > Host
> > > > > >> > > > > > Subscription
> > > > > >> > > > > > >> > > > Assignment
> > > > > >> > > > > > >> > > >
> > > > > >> > > > > > >> > > > When a new generation is created, we would
> > first
> > > > > write
> > > > > >> the
> > > > > >> > > > group
> > > > > >> > > > > > >> > metadata
> > > > > >> > > > > > >> > > > message which includes the generation and all
> > of
> > > > the
> > > > > >> > > > memberIds,
> > > > > >> > > > > > and
> > > > > >> > > > > > >> > then
> > > > > >> > > > > > >> > > > we'd write the member metadata messages. To
> > > answer
> > > > > the
> > > > > >> > > > > > DescribeGroup
> > > > > >> > > > > > >> > > > request, we'd read the group metadata at the
> > > cached
> > > > > >> offset
> > > > > >> > > > and,
> > > > > >> > > > > > >> > depending
> > > > > >> > > > > > >> > > > on the version, all of the following member
> > > > metadata.
> > > > > >> This
> > > > > >> > > > would
> > > > > >> > > > > > be
> > > > > >> > > > > > >> > more
> > > > > >> > > > > > >> > > > complex to maintain, but it seems doable if
> it
> > > > comes
> > > > > to
> > > > > >> > it.
> > > > > >> > > > > > >> > > >
> > > > > >> > > > > > >> > > > Thanks,
> > > > > >> > > > > > >> > > > Jason
> > > > > >> > > > > > >> > > >
> > > > > >> > > > > > >> > > > On Mon, May 23, 2016 at 6:15 PM, Becket Qin <
> > > > > >> > > > > becket.qin@gmail.com
> > > > > >> > > > > > >
> > > > > >> > > > > > >> > > wrote:
> > > > > >> > > > > > >> > > >
> > > > > >> > > > > > >> > > > > It might worth thinking a little further.
> We
> > > have
> > > > > >> > > discussed
> > > > > >> > > > > this
> > > > > >> > > > > > >> > before
> > > > > >> > > > > > >> > > > > that we want to avoid holding all the group
> > > > > metadata
> > > > > >> in
> > > > > >> > > > > memory.
> > > > > >> > > > > > >> > > > >
> > > > > >> > > > > > >> > > > > I am thinking about the following end
> state:
> > > > > >> > > > > > >> > > > >
> > > > > >> > > > > > >> > > > > 1. Enable compression on the offset topic.
> > > > > >> > > > > > >> > > > > 2. Instead of holding the entire group
> > metadata
> > > > in
> > > > > >> > memory
> > > > > >> > > on
> > > > > >> > > > > the
> > > > > >> > > > > > >> > > brokers,
> > > > > >> > > > > > >> > > > > each broker only keeps a [group -> Offset]
> > map,
> > > > the
> > > > > >> > offset
> > > > > >> > > > > > points
> > > > > >> > > > > > >> to
> > > > > >> > > > > > >> > > the
> > > > > >> > > > > > >> > > > > message in the offset topic which holds the
> > > > latest
> > > > > >> > > metadata
> > > > > >> > > > of
> > > > > >> > > > > > the
> > > > > >> > > > > > >> > > group.
> > > > > >> > > > > > >> > > > > 3. DescribeGroupResponse will read from the
> > > > offset
> > > > > >> topic
> > > > > >> > > > > > directly
> > > > > >> > > > > > >> > like
> > > > > >> > > > > > >> > > a
> > > > > >> > > > > > >> > > > > normal consumption, except that only
> exactly
> > > one
> > > > > >> message
> > > > > >> > > > will
> > > > > >> > > > > be
> > > > > >> > > > > > >> > > > returned.
> > > > > >> > > > > > >> > > > > 4. SyncGroupResponse will read the message,
> > > > extract
> > > > > >> the
> > > > > >> > > > > > assignment
> > > > > >> > > > > > >> > part
> > > > > >> > > > > > >> > > > and
> > > > > >> > > > > > >> > > > > send back the partition assignment. We can
> > > > compress
> > > > > >> the
> > > > > >> > > > > > partition
> > > > > >> > > > > > >> > > > > assignment before sends it out if we want.
> > > > > >> > > > > > >> > > > >
> > > > > >> > > > > > >> > > > > Jiangjie (Becket) Qin
> > > > > >> > > > > > >> > > > >
> > > > > >> > > > > > >> > > > > On Mon, May 23, 2016 at 5:08 PM, Jason
> > > Gustafson
> > > > <
> > > > > >> > > > > > >> jason@confluent.io
> > > > > >> > > > > > >> > >
> > > > > >> > > > > > >> > > > > wrote:
> > > > > >> > > > > > >> > > > >
> > > > > >> > > > > > >> > > > > > >
> > > > > >> > > > > > >> > > > > > > Jason, doesn't gzip (or other
> > compression)
> > > > > >> basically
> > > > > >> > > do
> > > > > >> > > > > > this?
> > > > > >> > > > > > >> If
> > > > > >> > > > > > >> > > the
> > > > > >> > > > > > >> > > > > > topic
> > > > > >> > > > > > >> > > > > > > is a string and the topic is repeated
> > > > > throughout,
> > > > > >> > > won't
> > > > > >> > > > > > >> > compression
> > > > > >> > > > > > >> > > > > > > basically replace all repeated
> instances
> > of
> > > > it
> > > > > >> with
> > > > > >> > an
> > > > > >> > > > > index
> > > > > >> > > > > > >> > > > reference
> > > > > >> > > > > > >> > > > > to
> > > > > >> > > > > > >> > > > > > > the full string?
> > > > > >> > > > > > >> > > > > >
> > > > > >> > > > > > >> > > > > >
> > > > > >> > > > > > >> > > > > > Hey James, yeah, that's probably true,
> but
> > > keep
> > > > > in
> > > > > >> > mind
> > > > > >> > > > that
> > > > > >> > > > > > the
> > > > > >> > > > > > >> > > > > > compression happens on the broker side.
> It
> > > > would
> > > > > be
> > > > > >> > nice
> > > > > >> > > > to
> > > > > >> > > > > > >> have a
> > > > > >> > > > > > >> > > more
> > > > > >> > > > > > >> > > > > > compact representation so that get some
> > > benefit
> > > > > >> over
> > > > > >> > the
> > > > > >> > > > > wire
> > > > > >> > > > > > as
> > > > > >> > > > > > >> > > well.
> > > > > >> > > > > > >> > > > > This
> > > > > >> > > > > > >> > > > > > seems to be less of a concern here, so
> the
> > > > bigger
> > > > > >> > gains
> > > > > >> > > > are
> > > > > >> > > > > > >> > probably
> > > > > >> > > > > > >> > > > from
> > > > > >> > > > > > >> > > > > > reducing the number of partitions that
> need
> > > to
> > > > be
> > > > > >> > listed
> > > > > >> > > > > > >> > > individually.
> > > > > >> > > > > > >> > > > > >
> > > > > >> > > > > > >> > > > > > -Jason
> > > > > >> > > > > > >> > > > > >
> > > > > >> > > > > > >> > > > > > On Mon, May 23, 2016 at 4:23 PM, Onur
> > > Karaman <
> > > > > >> > > > > > >> > > > > > onurkaraman.apache@gmail.com>
> > > > > >> > > > > > >> > > > > > wrote:
> > > > > >> > > > > > >> > > > > >
> > > > > >> > > > > > >> > > > > > > When figuring out these optimizations,
> > it's
> > > > > worth
> > > > > >> > > > keeping
> > > > > >> > > > > in
> > > > > >> > > > > > >> mind
> > > > > >> > > > > > >> > > the
> > > > > >> > > > > > >> > > > > > > improvements when the message is
> > > uncompressed
> > > > > vs
> > > > > >> > when
> > > > > >> > > > it's
> > > > > >> > > > > > >> > > > compressed.
> > > > > >> > > > > > >> > > > > > >
> > > > > >> > > > > > >> > > > > > > When uncompressed:
> > > > > >> > > > > > >> > > > > > > Fixing the Assignment serialization to
> > > > instead
> > > > > >> be a
> > > > > >> > > > topic
> > > > > >> > > > > > >> index
> > > > > >> > > > > > >> > > into
> > > > > >> > > > > > >> > > > > the
> > > > > >> > > > > > >> > > > > > > corresponding member's subscription
> list
> > > > would
> > > > > >> > usually
> > > > > >> > > > be
> > > > > >> > > > > a
> > > > > >> > > > > > >> good
> > > > > >> > > > > > >> > > > thing.
> > > > > >> > > > > > >> > > > > > >
> > > > > >> > > > > > >> > > > > > > I think the proposal is only worse when
> > the
> > > > > topic
> > > > > >> > > names
> > > > > >> > > > > are
> > > > > >> > > > > > >> > small.
> > > > > >> > > > > > >> > > > The
> > > > > >> > > > > > >> > > > > > > Type.STRING we use in our protocol for
> > the
> > > > > >> > > assignment's
> > > > > >> > > > > > >> > > > TOPIC_KEY_NAME
> > > > > >> > > > > > >> > > > > is
> > > > > >> > > > > > >> > > > > > > limited in length to Short.MAX_VALUE,
> so
> > > our
> > > > > >> strings
> > > > > >> > > are
> > > > > >> > > > > > first
> > > > > >> > > > > > >> > > > > prepended
> > > > > >> > > > > > >> > > > > > > with 2 bytes to indicate the string
> size.
> > > > > >> > > > > > >> > > > > > >
> > > > > >> > > > > > >> > > > > > > The new proposal does worse when:
> > > > > >> > > > > > >> > > > > > > 2 + utf_encoded_string_payload_size <
> > > > > >> > index_type_size
> > > > > >> > > > > > >> > > > > > > in other words when:
> > > > > >> > > > > > >> > > > > > > utf_encoded_string_payload_size <
> > > > > >> index_type_size -
> > > > > >> > 2
> > > > > >> > > > > > >> > > > > > >
> > > > > >> > > > > > >> > > > > > > If the index type ends up being
> > Type.INT32,
> > > > > then
> > > > > >> the
> > > > > >> > > > > > proposal
> > > > > >> > > > > > >> is
> > > > > >> > > > > > >> > > > worse
> > > > > >> > > > > > >> > > > > > when
> > > > > >> > > > > > >> > > > > > > the topic is length 1.
> > > > > >> > > > > > >> > > > > > > If the index type ends up being
> > Type.INT64,
> > > > > then
> > > > > >> the
> > > > > >> > > > > > proposal
> > > > > >> > > > > > >> is
> > > > > >> > > > > > >> > > > worse
> > > > > >> > > > > > >> > > > > > when
> > > > > >> > > > > > >> > > > > > > the topic is less than length 6.
> > > > > >> > > > > > >> > > > > > >
> > > > > >> > > > > > >> > > > > > > When compressed:
> > > > > >> > > > > > >> > > > > > > As James Cheng brought up, I'm not sure
> > how
> > > > > >> things
> > > > > >> > > > change
> > > > > >> > > > > > when
> > > > > >> > > > > > >> > > > > > compression
> > > > > >> > > > > > >> > > > > > > comes into the picture. This would be
> > worth
> > > > > >> > > > investigating.
> > > > > >> > > > > > >> > > > > > >
> > > > > >> > > > > > >> > > > > > > On Mon, May 23, 2016 at 4:05 PM, James
> > > Cheng
> > > > <
> > > > > >> > > > > > >> > wushujames@gmail.com
> > > > > >> > > > > > >> > > >
> > > > > >> > > > > > >> > > > > > wrote:
> > > > > >> > > > > > >> > > > > > >
> > > > > >> > > > > > >> > > > > > > >
> > > > > >> > > > > > >> > > > > > > > > On May 23, 2016, at 10:59 AM, Jason
> > > > > >> Gustafson <
> > > > > >> > > > > > >> > > > jason@confluent.io>
> > > > > >> > > > > > >> > > > > > > > wrote:
> > > > > >> > > > > > >> > > > > > > > >
> > > > > >> > > > > > >> > > > > > > > > 2. Maybe there's a better way to
> lay
> > > out
> > > > > the
> > > > > >> > > > > assignment
> > > > > >> > > > > > >> > without
> > > > > >> > > > > > >> > > > > > needing
> > > > > >> > > > > > >> > > > > > > > to
> > > > > >> > > > > > >> > > > > > > > > explicitly repeat the topic? For
> > > example,
> > > > > the
> > > > > >> > > leader
> > > > > >> > > > > > could
> > > > > >> > > > > > >> > sort
> > > > > >> > > > > > >> > > > the
> > > > > >> > > > > > >> > > > > > > > topics
> > > > > >> > > > > > >> > > > > > > > > for each member and just use an
> > integer
> > > > to
> > > > > >> > > represent
> > > > > >> > > > > the
> > > > > >> > > > > > >> > index
> > > > > >> > > > > > >> > > of
> > > > > >> > > > > > >> > > > > > each
> > > > > >> > > > > > >> > > > > > > > > topic within the sorted list (note
> > this
> > > > > >> depends
> > > > > >> > on
> > > > > >> > > > the
> > > > > >> > > > > > >> > > > subscription
> > > > > >> > > > > > >> > > > > > > > > including the full topic list).
> > > > > >> > > > > > >> > > > > > > > >
> > > > > >> > > > > > >> > > > > > > > > Assignment -> [TopicIndex
> > [Partition]]
> > > > > >> > > > > > >> > > > > > > > >
> > > > > >> > > > > > >> > > > > > > >
> > > > > >> > > > > > >> > > > > > > > Jason, doesn't gzip (or other
> > > compression)
> > > > > >> > basically
> > > > > >> > > > do
> > > > > >> > > > > > >> this?
> > > > > >> > > > > > >> > If
> > > > > >> > > > > > >> > > > the
> > > > > >> > > > > > >> > > > > > > topic
> > > > > >> > > > > > >> > > > > > > > is a string and the topic is repeated
> > > > > >> throughout,
> > > > > >> > > > won't
> > > > > >> > > > > > >> > > compression
> > > > > >> > > > > > >> > > > > > > > basically replace all repeated
> > instances
> > > of
> > > > > it
> > > > > >> > with
> > > > > >> > > an
> > > > > >> > > > > > index
> > > > > >> > > > > > >> > > > > reference
> > > > > >> > > > > > >> > > > > > to
> > > > > >> > > > > > >> > > > > > > > the full string?
> > > > > >> > > > > > >> > > > > > > >
> > > > > >> > > > > > >> > > > > > > > -James
> > > > > >> > > > > > >> > > > > > > >
> > > > > >> > > > > > >> > > > > > > > > You could even combine these two
> > > options
> > > > so
> > > > > >> that
> > > > > >> > > you
> > > > > >> > > > > > have
> > > > > >> > > > > > >> > only
> > > > > >> > > > > > >> > > 3
> > > > > >> > > > > > >> > > > > > > integers
> > > > > >> > > > > > >> > > > > > > > > for each topic assignment:
> > > > > >> > > > > > >> > > > > > > > >
> > > > > >> > > > > > >> > > > > > > > > Assignment -> [TopicIndex
> > MinPartition
> > > > > >> > > MaxPartition]
> > > > > >> > > > > > >> > > > > > > > >
> > > > > >> > > > > > >> > > > > > > > > There may even be better options
> > with a
> > > > > >> little
> > > > > >> > > more
> > > > > >> > > > > > >> thought.
> > > > > >> > > > > > >> > > All
> > > > > >> > > > > > >> > > > of
> > > > > >> > > > > > >> > > > > > > this
> > > > > >> > > > > > >> > > > > > > > is
> > > > > >> > > > > > >> > > > > > > > > just part of the client-side
> > protocol,
> > > so
> > > > > it
> > > > > >> > > > wouldn't
> > > > > >> > > > > > >> require
> > > > > >> > > > > > >> > > any
> > > > > >> > > > > > >> > > > > > > version
> > > > > >> > > > > > >> > > > > > > > > bumps on the broker. What do you
> > think?
> > > > > >> > > > > > >> > > > > > > > >
> > > > > >> > > > > > >> > > > > > > > > Thanks,
> > > > > >> > > > > > >> > > > > > > > > Jason
> > > > > >> > > > > > >> > > > > > > > >
> > > > > >> > > > > > >> > > > > > > > >
> > > > > >> > > > > > >> > > > > > > > >
> > > > > >> > > > > > >> > > > > > > > >
> > > > > >> > > > > > >> > > > > > > > > On Mon, May 23, 2016 at 9:17 AM,
> > > Guozhang
> > > > > >> Wang <
> > > > > >> > > > > > >> > > > wangguoz@gmail.com
> > > > > >> > > > > > >> > > > > >
> > > > > >> > > > > > >> > > > > > > > wrote:
> > > > > >> > > > > > >> > > > > > > > >
> > > > > >> > > > > > >> > > > > > > > >> The original concern is that regex
> > may
> > > > not
> > > > > >> be
> > > > > >> > > > > > efficiently
> > > > > >> > > > > > >> > > > > supported
> > > > > >> > > > > > >> > > > > > > > >> across-languages, but if there is
> a
> > > neat
> > > > > >> > > > workaround I
> > > > > >> > > > > > >> would
> > > > > >> > > > > > >> > > love
> > > > > >> > > > > > >> > > > > to
> > > > > >> > > > > > >> > > > > > > > learn.
> > > > > >> > > > > > >> > > > > > > > >>
> > > > > >> > > > > > >> > > > > > > > >> Guozhang
> > > > > >> > > > > > >> > > > > > > > >>
> > > > > >> > > > > > >> > > > > > > > >> On Mon, May 23, 2016 at 5:31 AM,
> > > Ismael
> > > > > >> Juma <
> > > > > >> > > > > > >> > > ismael@juma.me.uk
> > > > > >> > > > > > >> > > > >
> > > > > >> > > > > > >> > > > > > > wrote:
> > > > > >> > > > > > >> > > > > > > > >>
> > > > > >> > > > > > >> > > > > > > > >>> +1 to Jun's suggestion.
> > > > > >> > > > > > >> > > > > > > > >>>
> > > > > >> > > > > > >> > > > > > > > >>> Having said that, as a general
> > > point, I
> > > > > >> think
> > > > > >> > we
> > > > > >> > > > > > should
> > > > > >> > > > > > >> > > > consider
> > > > > >> > > > > > >> > > > > > > > >> supporting
> > > > > >> > > > > > >> > > > > > > > >>> topic patterns in the wire
> > protocol.
> > > It
> > > > > >> > requires
> > > > > >> > > > > some
> > > > > >> > > > > > >> > > thinking
> > > > > >> > > > > > >> > > > > for
> > > > > >> > > > > > >> > > > > > > > >>> cross-language support, but it
> > seems
> > > > > >> > > surmountable
> > > > > >> > > > > and
> > > > > >> > > > > > it
> > > > > >> > > > > > >> > > could
> > > > > >> > > > > > >> > > > > make
> > > > > >> > > > > > >> > > > > > > > >> certain
> > > > > >> > > > > > >> > > > > > > > >>> operations a lot more efficient
> > (the
> > > > fact
> > > > > >> > that a
> > > > > >> > > > > basic
> > > > > >> > > > > > >> > regex
> > > > > >> > > > > > >> > > > > > > > subscription
> > > > > >> > > > > > >> > > > > > > > >>> causes the consumer to request
> > > metadata
> > > > > for
> > > > > >> > all
> > > > > >> > > > > topics
> > > > > >> > > > > > >> is
> > > > > >> > > > > > >> > not
> > > > > >> > > > > > >> > > > > > great).
> > > > > >> > > > > > >> > > > > > > > >>>
> > > > > >> > > > > > >> > > > > > > > >>> Ismael
> > > > > >> > > > > > >> > > > > > > > >>>
> > > > > >> > > > > > >> > > > > > > > >>> On Sun, May 22, 2016 at 11:49 PM,
> > > > > Guozhang
> > > > > >> > Wang
> > > > > >> > > <
> > > > > >> > > > > > >> > > > > > wangguoz@gmail.com>
> > > > > >> > > > > > >> > > > > > > > >>> wrote:
> > > > > >> > > > > > >> > > > > > > > >>>
> > > > > >> > > > > > >> > > > > > > > >>>> I like Jun's suggestion in
> > changing
> > > > the
> > > > > >> > > handling
> > > > > >> > > > > > >> logics of
> > > > > >> > > > > > >> > > > > single
> > > > > >> > > > > > >> > > > > > > > large
> > > > > >> > > > > > >> > > > > > > > >>>> message on the consumer side.
> > > > > >> > > > > > >> > > > > > > > >>>>
> > > > > >> > > > > > >> > > > > > > > >>>> As for the case of "a single
> group
> > > > > >> > subscribing
> > > > > >> > > to
> > > > > >> > > > > > 3000
> > > > > >> > > > > > >> > > > topics",
> > > > > >> > > > > > >> > > > > > with
> > > > > >> > > > > > >> > > > > > > > >> 100
> > > > > >> > > > > > >> > > > > > > > >>>> consumers the 2.5Mb Gzip size is
> > > > > >> reasonable
> > > > > >> > to
> > > > > >> > > me
> > > > > >> > > > > > (when
> > > > > >> > > > > > >> > > > storing
> > > > > >> > > > > > >> > > > > in
> > > > > >> > > > > > >> > > > > > > ZK,
> > > > > >> > > > > > >> > > > > > > > >> we
> > > > > >> > > > > > >> > > > > > > > >>>> also have the znode limit which
> is
> > > set
> > > > > to
> > > > > >> 1Mb
> > > > > >> > > by
> > > > > >> > > > > > >> default,
> > > > > >> > > > > > >> > > > though
> > > > > >> > > > > > >> > > > > > > > >>> admittedly
> > > > > >> > > > > > >> > > > > > > > >>>> it is only for one consumer).
> And
> > if
> > > > we
> > > > > do
> > > > > >> > the
> > > > > >> > > > > change
> > > > > >> > > > > > >> as
> > > > > >> > > > > > >> > Jun
> > > > > >> > > > > > >> > > > > > > > suggested,
> > > > > >> > > > > > >> > > > > > > > >>>> 2.5Mb on follower's memory
> > pressure
> > > is
> > > > > OK
> > > > > >> I
> > > > > >> > > > think.
> > > > > >> > > > > > >> > > > > > > > >>>>
> > > > > >> > > > > > >> > > > > > > > >>>>
> > > > > >> > > > > > >> > > > > > > > >>>> Guozhang
> > > > > >> > > > > > >> > > > > > > > >>>>
> > > > > >> > > > > > >> > > > > > > > >>>>
> > > > > >> > > > > > >> > > > > > > > >>>> On Sat, May 21, 2016 at 12:51
> PM,
> > > Onur
> > > > > >> > Karaman
> > > > > >> > > <
> > > > > >> > > > > > >> > > > > > > > >>>> onurkaraman.apache@gmail.com
> > > > > >> > > > > > >> > > > > > > > >>>>> wrote:
> > > > > >> > > > > > >> > > > > > > > >>>>
> > > > > >> > > > > > >> > > > > > > > >>>>> Results without compression:
> > > > > >> > > > > > >> > > > > > > > >>>>> 1 consumer 292383 bytes
> > > > > >> > > > > > >> > > > > > > > >>>>> 5 consumers 1079579 bytes * the
> > > > tipping
> > > > > >> > point
> > > > > >> > > > > > >> > > > > > > > >>>>> 10 consumers 1855018 bytes
> > > > > >> > > > > > >> > > > > > > > >>>>> 20 consumers 2780220 bytes
> > > > > >> > > > > > >> > > > > > > > >>>>> 30 consumers 3705422 bytes
> > > > > >> > > > > > >> > > > > > > > >>>>> 40 consumers 4630624 bytes
> > > > > >> > > > > > >> > > > > > > > >>>>> 50 consumers 5555826 bytes
> > > > > >> > > > > > >> > > > > > > > >>>>> 60 consumers 6480788 bytes
> > > > > >> > > > > > >> > > > > > > > >>>>> 70 consumers 7405750 bytes
> > > > > >> > > > > > >> > > > > > > > >>>>> 80 consumers 8330712 bytes
> > > > > >> > > > > > >> > > > > > > > >>>>> 90 consumers 9255674 bytes
> > > > > >> > > > > > >> > > > > > > > >>>>> 100 consumers 10180636 bytes
> > > > > >> > > > > > >> > > > > > > > >>>>>
> > > > > >> > > > > > >> > > > > > > > >>>>> So it looks like gzip
> compression
> > > > > shrinks
> > > > > >> > the
> > > > > >> > > > > > message
> > > > > >> > > > > > >> > size
> > > > > >> > > > > > >> > > by
> > > > > >> > > > > > >> > > > > 4x.
> > > > > >> > > > > > >> > > > > > > > >>>>>
> > > > > >> > > > > > >> > > > > > > > >>>>> On Sat, May 21, 2016 at 9:47
> AM,
> > > Jun
> > > > > Rao
> > > > > >> <
> > > > > >> > > > > > >> > jun@confluent.io
> > > > > >> > > > > > >> > > >
> > > > > >> > > > > > >> > > > > > wrote:
> > > > > >> > > > > > >> > > > > > > > >>>>>
> > > > > >> > > > > > >> > > > > > > > >>>>>> Onur,
> > > > > >> > > > > > >> > > > > > > > >>>>>>
> > > > > >> > > > > > >> > > > > > > > >>>>>> Thanks for the investigation.
> > > > > >> > > > > > >> > > > > > > > >>>>>>
> > > > > >> > > > > > >> > > > > > > > >>>>>> Another option is to just fix
> > how
> > > we
> > > > > >> deal
> > > > > >> > > with
> > > > > >> > > > > the
> > > > > >> > > > > > >> case
> > > > > >> > > > > > >> > > > when a
> > > > > >> > > > > > >> > > > > > > > >>> message
> > > > > >> > > > > > >> > > > > > > > >>>> is
> > > > > >> > > > > > >> > > > > > > > >>>>>> larger than the fetch size.
> > Today,
> > > > if
> > > > > >> the
> > > > > >> > > fetch
> > > > > >> > > > > > size
> > > > > >> > > > > > >> is
> > > > > >> > > > > > >> > > > > smaller
> > > > > >> > > > > > >> > > > > > > > >> than
> > > > > >> > > > > > >> > > > > > > > >>>> the
> > > > > >> > > > > > >> > > > > > > > >>>>>> fetch size, the consumer will
> > get
> > > > > stuck.
> > > > > >> > > > Instead,
> > > > > >> > > > > > we
> > > > > >> > > > > > >> can
> > > > > >> > > > > > >> > > > > simply
> > > > > >> > > > > > >> > > > > > > > >>> return
> > > > > >> > > > > > >> > > > > > > > >>>>> the
> > > > > >> > > > > > >> > > > > > > > >>>>>> full message if it's larger
> than
> > > the
> > > > > >> fetch
> > > > > >> > > size
> > > > > >> > > > > w/o
> > > > > >> > > > > > >> > > > requiring
> > > > > >> > > > > > >> > > > > > the
> > > > > >> > > > > > >> > > > > > > > >>>>> consumer
> > > > > >> > > > > > >> > > > > > > > >>>>>> to manually adjust the fetch
> > size.
> > > > On
> > > > > >> the
> > > > > >> > > > broker
> > > > > >> > > > > > >> side,
> > > > > >> > > > > > >> > to
> > > > > >> > > > > > >> > > > > serve
> > > > > >> > > > > > >> > > > > > a
> > > > > >> > > > > > >> > > > > > > > >>> fetch
> > > > > >> > > > > > >> > > > > > > > >>>>>> request, we already do an
> index
> > > > lookup
> > > > > >> and
> > > > > >> > > then
> > > > > >> > > > > > scan
> > > > > >> > > > > > >> the
> > > > > >> > > > > > >> > > > log a
> > > > > >> > > > > > >> > > > > > bit
> > > > > >> > > > > > >> > > > > > > > >> to
> > > > > >> > > > > > >> > > > > > > > >>>>> find
> > > > > >> > > > > > >> > > > > > > > >>>>>> the message with the requested
> > > > offset.
> > > > > >> We
> > > > > >> > can
> > > > > >> > > > > just
> > > > > >> > > > > > >> check
> > > > > >> > > > > > >> > > the
> > > > > >> > > > > > >> > > > > > size
> > > > > >> > > > > > >> > > > > > > > >> of
> > > > > >> > > > > > >> > > > > > > > >>>> that
> > > > > >> > > > > > >> > > > > > > > >>>>>> message and return the full
> > > message
> > > > if
> > > > > >> its
> > > > > >> > > size
> > > > > >> > > > > is
> > > > > >> > > > > > >> > larger
> > > > > >> > > > > > >> > > > than
> > > > > >> > > > > > >> > > > > > the
> > > > > >> > > > > > >> > > > > > > > >>>> fetch
> > > > > >> > > > > > >> > > > > > > > >>>>>> size. This way, fetch size is
> > > really
> > > > > for
> > > > > >> > > > > > performance
> > > > > >> > > > > > >> > > > > > optimization,
> > > > > >> > > > > > >> > > > > > > > >>> i.e.
> > > > > >> > > > > > >> > > > > > > > >>>>> in
> > > > > >> > > > > > >> > > > > > > > >>>>>> the common case, we will not
> > > return
> > > > > more
> > > > > >> > > bytes
> > > > > >> > > > > than
> > > > > >> > > > > > >> > fetch
> > > > > >> > > > > > >> > > > > size,
> > > > > >> > > > > > >> > > > > > > but
> > > > > >> > > > > > >> > > > > > > > >>> if
> > > > > >> > > > > > >> > > > > > > > >>>>>> there is a large message, we
> > will
> > > > > return
> > > > > >> > more
> > > > > >> > > > > bytes
> > > > > >> > > > > > >> than
> > > > > >> > > > > > >> > > the
> > > > > >> > > > > > >> > > > > > > > >>> specified
> > > > > >> > > > > > >> > > > > > > > >>>>>> fetch size. In practice, large
> > > > > messages
> > > > > >> are
> > > > > >> > > > rare.
> > > > > >> > > > > > >> So, it
> > > > > >> > > > > > >> > > > > > shouldn't
> > > > > >> > > > > > >> > > > > > > > >>>>> increase
> > > > > >> > > > > > >> > > > > > > > >>>>>> the memory consumption on the
> > > client
> > > > > too
> > > > > >> > > much.
> > > > > >> > > > > > >> > > > > > > > >>>>>>
> > > > > >> > > > > > >> > > > > > > > >>>>>> Jun
> > > > > >> > > > > > >> > > > > > > > >>>>>>
> > > > > >> > > > > > >> > > > > > > > >>>>>> On Sat, May 21, 2016 at 3:34
> AM,
> > > > Onur
> > > > > >> > > Karaman <
> > > > > >> > > > > > >> > > > > > > > >>>>>> onurkaraman.apache@gmail.com>
> > > > > >> > > > > > >> > > > > > > > >>>>>> wrote:
> > > > > >> > > > > > >> > > > > > > > >>>>>>
> > > > > >> > > > > > >> > > > > > > > >>>>>>> Hey everyone. So I started
> > doing
> > > > some
> > > > > >> > tests
> > > > > >> > > on
> > > > > >> > > > > the
> > > > > >> > > > > > >> new
> > > > > >> > > > > > >> > > > > > > > >>>>>> consumer/coordinator
> > > > > >> > > > > > >> > > > > > > > >>>>>>> to see if it could handle
> more
> > > > > >> strenuous
> > > > > >> > use
> > > > > >> > > > > cases
> > > > > >> > > > > > >> like
> > > > > >> > > > > > >> > > > > > mirroring
> > > > > >> > > > > > >> > > > > > > > >>>>>> clusters
> > > > > >> > > > > > >> > > > > > > > >>>>>>> with thousands of topics and
> > > > thought
> > > > > >> I'd
> > > > > >> > > share
> > > > > >> > > > > > >> > whatever I
> > > > > >> > > > > > >> > > > > have
> > > > > >> > > > > > >> > > > > > so
> > > > > >> > > > > > >> > > > > > > > >>>> far.
> > > > > >> > > > > > >> > > > > > > > >>>>>>>
> > > > > >> > > > > > >> > > > > > > > >>>>>>> The scalability limit: the
> > amount
> > > > of
> > > > > >> group
> > > > > >> > > > > > metadata
> > > > > >> > > > > > >> we
> > > > > >> > > > > > >> > > can
> > > > > >> > > > > > >> > > > > fit
> > > > > >> > > > > > >> > > > > > > > >> into
> > > > > >> > > > > > >> > > > > > > > >>>> one
> > > > > >> > > > > > >> > > > > > > > >>>>>>> message
> > > > > >> > > > > > >> > > > > > > > >>>>>>>
> > > > > >> > > > > > >> > > > > > > > >>>>>>> Some background:
> > > > > >> > > > > > >> > > > > > > > >>>>>>> Client-side assignment is
> > > > implemented
> > > > > >> in
> > > > > >> > two
> > > > > >> > > > > > phases
> > > > > >> > > > > > >> > > > > > > > >>>>>>> 1. a PreparingRebalance phase
> > > that
> > > > > >> > > identifies
> > > > > >> > > > > > >> members
> > > > > >> > > > > > >> > of
> > > > > >> > > > > > >> > > > the
> > > > > >> > > > > > >> > > > > > > > >> group
> > > > > >> > > > > > >> > > > > > > > >>>> and
> > > > > >> > > > > > >> > > > > > > > >>>>>>> aggregates member
> > subscriptions.
> > > > > >> > > > > > >> > > > > > > > >>>>>>> 2. an AwaitingSync phase that
> > > waits
> > > > > for
> > > > > >> > the
> > > > > >> > > > > group
> > > > > >> > > > > > >> > leader
> > > > > >> > > > > > >> > > to
> > > > > >> > > > > > >> > > > > > > > >> decide
> > > > > >> > > > > > >> > > > > > > > >>>>> member
> > > > > >> > > > > > >> > > > > > > > >>>>>>> assignments based on the
> member
> > > > > >> > > subscriptions
> > > > > >> > > > > > across
> > > > > >> > > > > > >> > the
> > > > > >> > > > > > >> > > > > group.
> > > > > >> > > > > > >> > > > > > > > >>>>>>>  - The leader announces this
> > > > decision
> > > > > >> > with a
> > > > > >> > > > > > >> > > > > SyncGroupRequest.
> > > > > >> > > > > > >> > > > > > > > >> The
> > > > > >> > > > > > >> > > > > > > > >>>>>>> GroupCoordinator handles
> > > > > >> SyncGroupRequests
> > > > > >> > > by
> > > > > >> > > > > > >> appending
> > > > > >> > > > > > >> > > all
> > > > > >> > > > > > >> > > > > > group
> > > > > >> > > > > > >> > > > > > > > >>>> state
> > > > > >> > > > > > >> > > > > > > > >>>>>>> into a single message under
> the
> > > > > >> > > > > __consumer_offsets
> > > > > >> > > > > > >> > topic.
> > > > > >> > > > > > >> > > > > This
> > > > > >> > > > > > >> > > > > > > > >>>> message
> > > > > >> > > > > > >> > > > > > > > >>>>> is
> > > > > >> > > > > > >> > > > > > > > >>>>>>> keyed on the group id and
> > > contains
> > > > > each
> > > > > >> > > member
> > > > > >> > > > > > >> > > subscription
> > > > > >> > > > > > >> > > > > as
> > > > > >> > > > > > >> > > > > > > > >> well
> > > > > >> > > > > > >> > > > > > > > >>>> as
> > > > > >> > > > > > >> > > > > > > > >>>>>> the
> > > > > >> > > > > > >> > > > > > > > >>>>>>> decided assignment for each
> > > member.
> > > > > >> > > > > > >> > > > > > > > >>>>>>>
> > > > > >> > > > > > >> > > > > > > > >>>>>>> The environment:
> > > > > >> > > > > > >> > > > > > > > >>>>>>> - one broker
> > > > > >> > > > > > >> > > > > > > > >>>>>>> - one __consumer_offsets
> > > partition
> > > > > >> > > > > > >> > > > > > > > >>>>>>> -
> > > offsets.topic.compression.codec=1
> > > > > //
> > > > > >> > this
> > > > > >> > > is
> > > > > >> > > > > > gzip
> > > > > >> > > > > > >> > > > > > > > >>>>>>> - broker has my pending
> > > KAFKA-3718
> > > > > >> patch
> > > > > >> > > that
> > > > > >> > > > > > >> actually
> > > > > >> > > > > > >> > > > makes
> > > > > >> > > > > > >> > > > > > use
> > > > > >> > > > > > >> > > > > > > > >> of
> > > > > >> > > > > > >> > > > > > > > >>>>>>>
> > offsets.topic.compression.codec:
> > > > > >> > > > > > >> > > > > > > > >>>>>>
> > > > > >> https://github.com/apache/kafka/pull/1394
> > > > > >> > > > > > >> > > > > > > > >>>>>>> - around 3000 topics. This is
> > an
> > > > > actual
> > > > > >> > > subset
> > > > > >> > > > > of
> > > > > >> > > > > > >> > topics
> > > > > >> > > > > > >> > > > from
> > > > > >> > > > > > >> > > > > > one
> > > > > >> > > > > > >> > > > > > > > >>> of
> > > > > >> > > > > > >> > > > > > > > >>>>> our
> > > > > >> > > > > > >> > > > > > > > >>>>>>> clusters.
> > > > > >> > > > > > >> > > > > > > > >>>>>>> - topics have 8 partitions
> > > > > >> > > > > > >> > > > > > > > >>>>>>> - topics are 25 characters
> long
> > > on
> > > > > >> average
> > > > > >> > > > > > >> > > > > > > > >>>>>>> - one group with a varying
> > number
> > > > of
> > > > > >> > > consumers
> > > > > >> > > > > > each
> > > > > >> > > > > > >> > > > hardcoded
> > > > > >> > > > > > >> > > > > > > > >> with
> > > > > >> > > > > > >> > > > > > > > >>>> all
> > > > > >> > > > > > >> > > > > > > > >>>>>> the
> > > > > >> > > > > > >> > > > > > > > >>>>>>> topics just to make the tests
> > > more
> > > > > >> > > consistent.
> > > > > >> > > > > > >> > > wildcarding
> > > > > >> > > > > > >> > > > > with
> > > > > >> > > > > > >> > > > > > > > >> .*
> > > > > >> > > > > > >> > > > > > > > >>>>> should
> > > > > >> > > > > > >> > > > > > > > >>>>>>> have the same effect once the
> > > > > >> subscription
> > > > > >> > > > hits
> > > > > >> > > > > > the
> > > > > >> > > > > > >> > > > > coordinator
> > > > > >> > > > > > >> > > > > > > > >> as
> > > > > >> > > > > > >> > > > > > > > >>>> the
> > > > > >> > > > > > >> > > > > > > > >>>>>>> subscription has already been
> > > fully
> > > > > >> > expanded
> > > > > >> > > > out
> > > > > >> > > > > > to
> > > > > >> > > > > > >> the
> > > > > >> > > > > > >> > > > list
> > > > > >> > > > > > >> > > > > of
> > > > > >> > > > > > >> > > > > > > > >>>> topics
> > > > > >> > > > > > >> > > > > > > > >>>>> by
> > > > > >> > > > > > >> > > > > > > > >>>>>>> the consumers.
> > > > > >> > > > > > >> > > > > > > > >>>>>>> - I added some log messages
> to
> > > > > >> Log.scala
> > > > > >> > to
> > > > > >> > > > > print
> > > > > >> > > > > > >> out
> > > > > >> > > > > > >> > the
> > > > > >> > > > > > >> > > > > > message
> > > > > >> > > > > > >> > > > > > > > >>>> sizes
> > > > > >> > > > > > >> > > > > > > > >>>>>>> after compression
> > > > > >> > > > > > >> > > > > > > > >>>>>>> - there are no producers at
> all
> > > and
> > > > > >> auto
> > > > > >> > > > commits
> > > > > >> > > > > > are
> > > > > >> > > > > > >> > > > > disabled.
> > > > > >> > > > > > >> > > > > > > > >> The
> > > > > >> > > > > > >> > > > > > > > >>>> only
> > > > > >> > > > > > >> > > > > > > > >>>>>>> topic with messages getting
> > added
> > > > is
> > > > > >> the
> > > > > >> > > > > > >> > > __consumer_offsets
> > > > > >> > > > > > >> > > > > > topic
> > > > > >> > > > > > >> > > > > > > > >>> and
> > > > > >> > > > > > >> > > > > > > > >>>>>>> they're only from storing
> group
> > > > > >> metadata
> > > > > >> > > while
> > > > > >> > > > > > >> > processing
> > > > > >> > > > > > >> > > > > > > > >>>>>>> SyncGroupRequests.
> > > > > >> > > > > > >> > > > > > > > >>>>>>>
> > > > > >> > > > > > >> > > > > > > > >>>>>>> Results:
> > > > > >> > > > > > >> > > > > > > > >>>>>>> The results below show that
> we
> > > > exceed
> > > > > >> the
> > > > > >> > > > > 1000012
> > > > > >> > > > > > >> byte
> > > > > >> > > > > > >> > > > > > > > >>>>>>> KafkaConfig.messageMaxBytes
> > limit
> > > > > >> > relatively
> > > > > >> > > > > > quickly
> > > > > >> > > > > > >> > > > (between
> > > > > >> > > > > > >> > > > > > > > >> 30-40
> > > > > >> > > > > > >> > > > > > > > >>>>>>> consumers):
> > > > > >> > > > > > >> > > > > > > > >>>>>>> 1 consumer 54739 bytes
> > > > > >> > > > > > >> > > > > > > > >>>>>>> 5 consumers 261524 bytes
> > > > > >> > > > > > >> > > > > > > > >>>>>>> 10 consumers 459804 bytes
> > > > > >> > > > > > >> > > > > > > > >>>>>>> 20 consumers 702499 bytes
> > > > > >> > > > > > >> > > > > > > > >>>>>>> 30 consumers 930525 bytes
> > > > > >> > > > > > >> > > > > > > > >>>>>>> 40 consumers 1115657 bytes *
> > the
> > > > > >> tipping
> > > > > >> > > point
> > > > > >> > > > > > >> > > > > > > > >>>>>>> 50 consumers 1363112 bytes
> > > > > >> > > > > > >> > > > > > > > >>>>>>> 60 consumers 1598621 bytes
> > > > > >> > > > > > >> > > > > > > > >>>>>>> 70 consumers 1837359 bytes
> > > > > >> > > > > > >> > > > > > > > >>>>>>> 80 consumers 2066934 bytes
> > > > > >> > > > > > >> > > > > > > > >>>>>>> 90 consumers 2310970 bytes
> > > > > >> > > > > > >> > > > > > > > >>>>>>> 100 consumers 2542735 bytes
> > > > > >> > > > > > >> > > > > > > > >>>>>>>
> > > > > >> > > > > > >> > > > > > > > >>>>>>> Note that the growth itself
> is
> > > > pretty
> > > > > >> > > gradual.
> > > > > >> > > > > > >> Plotting
> > > > > >> > > > > > >> > > the
> > > > > >> > > > > > >> > > > > > > > >> points
> > > > > >> > > > > > >> > > > > > > > >>>>> makes
> > > > > >> > > > > > >> > > > > > > > >>>>>> it
> > > > > >> > > > > > >> > > > > > > > >>>>>>> look roughly linear w.r.t the
> > > > number
> > > > > of
> > > > > >> > > > > consumers:
> > > > > >> > > > > > >> > > > > > > > >>>>>>>
> > > > > >> > > > > > >> > > > > > > > >>>>>>>
> > > > > >> > > > > > >> > > > > > > > >>>>>>
> > > > > >> > > > > > >> > > > > > > > >>>>>
> > > > > >> > > > > > >> > > > > > > > >>>>
> > > > > >> > > > > > >> > > > > > > > >>>
> > > > > >> > > > > > >> > > > > > > > >>
> > > > > >> > > > > > >> > > > > > > >
> > > > > >> > > > > > >> > > > > > >
> > > > > >> > > > > > >> > > > > >
> > > > > >> > > > > > >> > > > >
> > > > > >> > > > > > >> > > >
> > > > > >> > > > > > >> > >
> > > > > >> > > > > > >> >
> > > > > >> > > > > > >>
> > > > > >> > > > > >
> > > > > >> > > > >
> > > > > >> > > >
> > > > > >> > >
> > > > > >> >
> > > > > >>
> > > > >
> > > >
> > >
> >
> https://www.wolframalpha.com/input/?i=(1,+54739),+(5,+261524),+(10,+459804),+(20,+702499),+(30,+930525),+(40,+1115657),+(50,+1363112),+(60,+1598621),+(70,+1837359),+(80,+2066934),+(90,+2310970),+(100,+2542735)
> > > > > >> > > > > > >> > > > > > > > >>>>>>>
> > > > > >> > > > > > >> > > > > > > > >>>>>>> Also note that these numbers
> > > aren't
> > > > > >> > averages
> > > > > >> > > > or
> > > > > >> > > > > > >> medians
> > > > > >> > > > > > >> > > or
> > > > > >> > > > > > >> > > > > > > > >> anything
> > > > > >> > > > > > >> > > > > > > > >>>>> like
> > > > > >> > > > > > >> > > > > > > > >>>>>>> that. It's just the byte size
> > > from
> > > > a
> > > > > >> given
> > > > > >> > > > run.
> > > > > >> > > > > I
> > > > > >> > > > > > >> did
> > > > > >> > > > > > >> > run
> > > > > >> > > > > > >> > > > > them
> > > > > >> > > > > > >> > > > > > a
> > > > > >> > > > > > >> > > > > > > > >>> few
> > > > > >> > > > > > >> > > > > > > > >>>>>> times
> > > > > >> > > > > > >> > > > > > > > >>>>>>> and saw similar results.
> > > > > >> > > > > > >> > > > > > > > >>>>>>>
> > > > > >> > > > > > >> > > > > > > > >>>>>>> Impact:
> > > > > >> > > > > > >> > > > > > > > >>>>>>> Even after adding gzip to the
> > > > > >> > > > __consumer_offsets
> > > > > >> > > > > > >> topic
> > > > > >> > > > > > >> > > with
> > > > > >> > > > > > >> > > > > my
> > > > > >> > > > > > >> > > > > > > > >>>> pending
> > > > > >> > > > > > >> > > > > > > > >>>>>>> KAFKA-3718 patch, the
> > > AwaitingSync
> > > > > >> phase
> > > > > >> > of
> > > > > >> > > > the
> > > > > >> > > > > > >> group
> > > > > >> > > > > > >> > > fails
> > > > > >> > > > > > >> > > > > > with
> > > > > >> > > > > > >> > > > > > > > >>>>>>> RecordTooLargeException. This
> > > means
> > > > > the
> > > > > >> > > > combined
> > > > > >> > > > > > >> size
> > > > > >> > > > > > >> > of
> > > > > >> > > > > > >> > > > each
> > > > > >> > > > > > >> > > > > > > > >>>> member's
> > > > > >> > > > > > >> > > > > > > > >>>>>>> subscriptions and assignments
> > > > > exceeded
> > > > > >> the
> > > > > >> > > > > > >> > > > > > > > >>>> KafkaConfig.messageMaxBytes
> > > > > >> > > > > > >> > > > > > > > >>>>> of
> > > > > >> > > > > > >> > > > > > > > >>>>>>> 1000012 bytes. The group ends
> > up
> > > > > dying.
> > > > > >> > > > > > >> > > > > > > > >>>>>>>
> > > > > >> > > > > > >> > > > > > > > >>>>>>> Options:
> > > > > >> > > > > > >> > > > > > > > >>>>>>> 1. Config change: reduce the
> > > number
> > > > > of
> > > > > >> > > > consumers
> > > > > >> > > > > > in
> > > > > >> > > > > > >> the
> > > > > >> > > > > > >> > > > > group.
> > > > > >> > > > > > >> > > > > > > > >> This
> > > > > >> > > > > > >> > > > > > > > >>>>> isn't
> > > > > >> > > > > > >> > > > > > > > >>>>>>> always a realistic answer in
> > more
> > > > > >> > strenuous
> > > > > >> > > > use
> > > > > >> > > > > > >> cases
> > > > > >> > > > > > >> > > like
> > > > > >> > > > > > >> > > > > > > > >>>> MirrorMaker
> > > > > >> > > > > > >> > > > > > > > >>>>>>> clusters or for auditing.
> > > > > >> > > > > > >> > > > > > > > >>>>>>> 2. Config change: split the
> > group
> > > > > into
> > > > > >> > > smaller
> > > > > >> > > > > > >> groups
> > > > > >> > > > > > >> > > which
> > > > > >> > > > > > >> > > > > > > > >>> together
> > > > > >> > > > > > >> > > > > > > > >>>>> will
> > > > > >> > > > > > >> > > > > > > > >>>>>>> get full coverage of the
> > topics.
> > > > This
> > > > > >> > gives
> > > > > >> > > > each
> > > > > >> > > > > > >> group
> > > > > >> > > > > > >> > > > > member a
> > > > > >> > > > > > >> > > > > > > > >>>> smaller
> > > > > >> > > > > > >> > > > > > > > >>>>>>> subscription.(ex: g1 has
> topics
> > > > > >> starting
> > > > > >> > > with
> > > > > >> > > > > a-m
> > > > > >> > > > > > >> while
> > > > > >> > > > > > >> > > g2
> > > > > >> > > > > > >> > > > > has
> > > > > >> > > > > > >> > > > > > > > >>> topics
> > > > > >> > > > > > >> > > > > > > > >>>>>>> starting ith n-z). This would
> > be
> > > > > >> > > operationally
> > > > > >> > > > > > >> painful
> > > > > >> > > > > > >> > to
> > > > > >> > > > > > >> > > > > > manage.
> > > > > >> > > > > > >> > > > > > > > >>>>>>> 3. Config change: split the
> > > topics
> > > > > >> among
> > > > > >> > > > members
> > > > > >> > > > > > of
> > > > > >> > > > > > >> the
> > > > > >> > > > > > >> > > > > group.
> > > > > >> > > > > > >> > > > > > > > >>> Again
> > > > > >> > > > > > >> > > > > > > > >>>>> this
> > > > > >> > > > > > >> > > > > > > > >>>>>>> gives each group member a
> > smaller
> > > > > >> > > > subscription.
> > > > > >> > > > > > This
> > > > > >> > > > > > >> > > would
> > > > > >> > > > > > >> > > > > also
> > > > > >> > > > > > >> > > > > > > > >> be
> > > > > >> > > > > > >> > > > > > > > >>>>>>> operationally painful to
> > manage.
> > > > > >> > > > > > >> > > > > > > > >>>>>>> 4. Config change: bump up
> > > > > >> > > > > > >> KafkaConfig.messageMaxBytes
> > > > > >> > > > > > >> > (a
> > > > > >> > > > > > >> > > > > > > > >>> topic-level
> > > > > >> > > > > > >> > > > > > > > >>>>>>> config) and
> > > > > >> > KafkaConfig.replicaFetchMaxBytes
> > > > > >> > > > (a
> > > > > >> > > > > > >> > > > broker-level
> > > > > >> > > > > > >> > > > > > > > >>> config).
> > > > > >> > > > > > >> > > > > > > > >>>>>>> Applying messageMaxBytes to
> > just
> > > > the
> > > > > >> > > > > > >> __consumer_offsets
> > > > > >> > > > > > >> > > > topic
> > > > > >> > > > > > >> > > > > > > > >> seems
> > > > > >> > > > > > >> > > > > > > > >>>>>>> relatively harmless, but
> > bumping
> > > up
> > > > > the
> > > > > >> > > > > > broker-level
> > > > > >> > > > > > >> > > > > > > > >>>>> replicaFetchMaxBytes
> > > > > >> > > > > > >> > > > > > > > >>>>>>> would probably need more
> > > attention.
> > > > > >> > > > > > >> > > > > > > > >>>>>>> 5. Config change: try
> different
> > > > > >> > compression
> > > > > >> > > > > > codecs.
> > > > > >> > > > > > >> > Based
> > > > > >> > > > > > >> > > > on
> > > > > >> > > > > > >> > > > > 2
> > > > > >> > > > > > >> > > > > > > > >>>> minutes
> > > > > >> > > > > > >> > > > > > > > >>>>> of
> > > > > >> > > > > > >> > > > > > > > >>>>>>> googling, it seems like lz4
> and
> > > > > snappy
> > > > > >> are
> > > > > >> > > > > faster
> > > > > >> > > > > > >> than
> > > > > >> > > > > > >> > > gzip
> > > > > >> > > > > > >> > > > > but
> > > > > >> > > > > > >> > > > > > > > >>> have
> > > > > >> > > > > > >> > > > > > > > >>>>>> worse
> > > > > >> > > > > > >> > > > > > > > >>>>>>> compression, so this probably
> > > won't
> > > > > >> help.
> > > > > >> > > > > > >> > > > > > > > >>>>>>> 6. Implementation change:
> > support
> > > > > >> sending
> > > > > >> > > the
> > > > > >> > > > > > regex
> > > > > >> > > > > > >> > over
> > > > > >> > > > > > >> > > > the
> > > > > >> > > > > > >> > > > > > wire
> > > > > >> > > > > > >> > > > > > > > >>>>> instead
> > > > > >> > > > > > >> > > > > > > > >>>>>>> of the fully expanded topic
> > > > > >> > subscriptions. I
> > > > > >> > > > > think
> > > > > >> > > > > > >> > people
> > > > > >> > > > > > >> > > > > said
> > > > > >> > > > > > >> > > > > > in
> > > > > >> > > > > > >> > > > > > > > >>> the
> > > > > >> > > > > > >> > > > > > > > >>>>>> past
> > > > > >> > > > > > >> > > > > > > > >>>>>>> that different languages have
> > > > subtle
> > > > > >> > > > differences
> > > > > >> > > > > > in
> > > > > >> > > > > > >> > > regex,
> > > > > >> > > > > > >> > > > so
> > > > > >> > > > > > >> > > > > > > > >> this
> > > > > >> > > > > > >> > > > > > > > >>>>>> doesn't
> > > > > >> > > > > > >> > > > > > > > >>>>>>> play nicely with
> cross-language
> > > > > groups.
> > > > > >> > > > > > >> > > > > > > > >>>>>>> 7. Implementation change:
> maybe
> > > we
> > > > > can
> > > > > >> > > reverse
> > > > > >> > > > > the
> > > > > >> > > > > > >> > > mapping?
> > > > > >> > > > > > >> > > > > > > > >> Instead
> > > > > >> > > > > > >> > > > > > > > >>>> of
> > > > > >> > > > > > >> > > > > > > > >>>>>>> mapping from member to
> > > > subscriptions,
> > > > > >> we
> > > > > >> > can
> > > > > >> > > > > map a
> > > > > >> > > > > > >> > > > > subscription
> > > > > >> > > > > > >> > > > > > > > >> to
> > > > > >> > > > > > >> > > > > > > > >>> a
> > > > > >> > > > > > >> > > > > > > > >>>>> list
> > > > > >> > > > > > >> > > > > > > > >>>>>>> of members.
> > > > > >> > > > > > >> > > > > > > > >>>>>>> 8. Implementation change:
> maybe
> > > we
> > > > > can
> > > > > >> try
> > > > > >> > > to
> > > > > >> > > > > > break
> > > > > >> > > > > > >> > apart
> > > > > >> > > > > > >> > > > the
> > > > > >> > > > > > >> > > > > > > > >>>>>> subscription
> > > > > >> > > > > > >> > > > > > > > >>>>>>> and assignments from the same
> > > > > >> > > SyncGroupRequest
> > > > > >> > > > > > into
> > > > > >> > > > > > >> > > > multiple
> > > > > >> > > > > > >> > > > > > > > >>> records?
> > > > > >> > > > > > >> > > > > > > > >>>>>> They
> > > > > >> > > > > > >> > > > > > > > >>>>>>> can still go to the same
> > message
> > > > set
> > > > > >> and
> > > > > >> > get
> > > > > >> > > > > > >> appended
> > > > > >> > > > > > >> > > > > together.
> > > > > >> > > > > > >> > > > > > > > >>> This
> > > > > >> > > > > > >> > > > > > > > >>>>> way
> > > > > >> > > > > > >> > > > > > > > >>>>>>> the limit become the segment
> > > size,
> > > > > >> which
> > > > > >> > > > > shouldn't
> > > > > >> > > > > > >> be a
> > > > > >> > > > > > >> > > > > > problem.
> > > > > >> > > > > > >> > > > > > > > >>> This
> > > > > >> > > > > > >> > > > > > > > >>>>> can
> > > > > >> > > > > > >> > > > > > > > >>>>>>> be tricky to get right
> because
> > > > we're
> > > > > >> > > currently
> > > > > >> > > > > > >> keying
> > > > > >> > > > > > >> > > these
> > > > > >> > > > > > >> > > > > > > > >>> messages
> > > > > >> > > > > > >> > > > > > > > >>>> on
> > > > > >> > > > > > >> > > > > > > > >>>>>> the
> > > > > >> > > > > > >> > > > > > > > >>>>>>> group, so I think records
> from
> > > the
> > > > > same
> > > > > >> > > > > rebalance
> > > > > >> > > > > > >> might
> > > > > >> > > > > > >> > > > > > > > >>> accidentally
> > > > > >> > > > > > >> > > > > > > > >>>>>>> compact one another, but my
> > > > > >> understanding
> > > > > >> > of
> > > > > >> > > > > > >> compaction
> > > > > >> > > > > > >> > > > isn't
> > > > > >> > > > > > >> > > > > > > > >> that
> > > > > >> > > > > > >> > > > > > > > >>>>> great.
> > > > > >> > > > > > >> > > > > > > > >>>>>>>
> > > > > >> > > > > > >> > > > > > > > >>>>>>> Todo:
> > > > > >> > > > > > >> > > > > > > > >>>>>>> It would be interesting to
> > rerun
> > > > the
> > > > > >> tests
> > > > > >> > > > with
> > > > > >> > > > > no
> > > > > >> > > > > > >> > > > > compression
> > > > > >> > > > > > >> > > > > > > > >> just
> > > > > >> > > > > > >> > > > > > > > >>>> to
> > > > > >> > > > > > >> > > > > > > > >>>>>> see
> > > > > >> > > > > > >> > > > > > > > >>>>>>> how much gzip is helping but
> > it's
> > > > > >> getting
> > > > > >> > > > late.
> > > > > >> > > > > > >> Maybe
> > > > > >> > > > > > >> > > > > tomorrow?
> > > > > >> > > > > > >> > > > > > > > >>>>>>>
> > > > > >> > > > > > >> > > > > > > > >>>>>>> - Onur
> > > > > >> > > > > > >> > > > > > > > >>>>>>>
> > > > > >> > > > > > >> > > > > > > > >>>>>>
> > > > > >> > > > > > >> > > > > > > > >>>>>
> > > > > >> > > > > > >> > > > > > > > >>>>
> > > > > >> > > > > > >> > > > > > > > >>>>
> > > > > >> > > > > > >> > > > > > > > >>>>
> > > > > >> > > > > > >> > > > > > > > >>>> --
> > > > > >> > > > > > >> > > > > > > > >>>> -- Guozhang
> > > > > >> > > > > > >> > > > > > > > >>>>
> > > > > >> > > > > > >> > > > > > > > >>>
> > > > > >> > > > > > >> > > > > > > > >>
> > > > > >> > > > > > >> > > > > > > > >>
> > > > > >> > > > > > >> > > > > > > > >>
> > > > > >> > > > > > >> > > > > > > > >> --
> > > > > >> > > > > > >> > > > > > > > >> -- Guozhang
> > > > > >> > > > > > >> > > > > > > > >>
> > > > > >> > > > > > >> > > > > > > >
> > > > > >> > > > > > >> > > > > > > >
> > > > > >> > > > > > >> > > > > > >
> > > > > >> > > > > > >> > > > > >
> > > > > >> > > > > > >> > > > >
> > > > > >> > > > > > >> > > >
> > > > > >> > > > > > >> > >
> > > > > >> > > > > > >> >
> > > > > >> > > > > > >> >
> > > > > >> > > > > > >> >
> > > > > >> > > > > > >> > --
> > > > > >> > > > > > >> > -- Guozhang
> > > > > >> > > > > > >> >
> > > > > >> > > > > > >>
> > > > > >> > > > > > >
> > > > > >> > > > > > >
> > > > > >> > > > > >
> > > > > >> > > > >
> > > > > >> > > >
> > > > > >> > >
> > > > > >> >
> > > > > >>
> > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message