kafka-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Boyang Chen <reluctanthero...@gmail.com>
Subject Re: [DISCUSS] KIP-447: Producer scalability for exactly once semantics
Date Tue, 25 Jun 2019 23:36:50 GMT
Thank you for the context Colin. The groupId was indeed a copy-paste error.
Our use case here for 447 is (Quoted from Guozhang):
'''
I think if we can do something else to
avoid this config though, for example we can use the embedded AdminClient
to send the APIVersion request upon starting up, and based on the returned
value decides whether to go to the old code path or the new behavior.
'''
The benefit we get is to avoid adding a new configuration to make a
decision simply base on broker version. If you have concerns with exposing
ApiVersion for client, we could
try to think of alternative solutions too.

Boyang



On Tue, Jun 25, 2019 at 4:20 PM Colin McCabe <cmccabe@apache.org> wrote:

> kafka.api.ApiVersion is an internal class, not suitable to exposing
> through AdminClient.  That class is not even accessible without having the
> broker jars on your CLASSPATH.
>
> Another question is, what is the groupId parameter doing in the call?  The
> API versions are the same no matter what consumer group we use, right?
> Perhaps this was a copy and paste error?
>
> This is not the first time we have discussed having a method in
> AdminClient to retrieve API version information.  In fact, the original KIP
> which created KafkaAdminClient specified an API for fetching version
> information.  It was called apiVersions and it is still there on the wiki.
> See
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-117%3A+Add+a+public+AdminClient+API+for+Kafka+admin+operations
>
> However, this API wasn't ready in time for 0.11.0 so we shipped without
> it.  There was a JIRA to implement it for later versions,
> https://issues.apache.org/jira/browse/KAFKA-5214 , as well as a PR,
> https://github.com/apache/kafka/pull/3012 .  However, we started to
> rethink whether this AdminClient function was even necessary.  Most of the
> use-cases we could think of seemed like horrible hacks.  So it has never
> really been implemented (yet?).
>
> best,
> Colin
>
>
> On Tue, Jun 25, 2019, at 15:46, Boyang Chen wrote:
> > Actually, after a second thought, I think it actually makes sense to
> > support auto upgrade through admin client to help use get api version
> > from
> > broker.
> > A draft KIP is here:
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-483%3A++Add+Broker+Version+API+in+Admin+Client
> >
> > Boyang
> >
> > On Tue, Jun 25, 2019 at 2:57 PM Boyang Chen <reluctanthero104@gmail.com>
> > wrote:
> >
> > > Thank you Guozhang, some of my understandings are inline below.
> > >
> > > On Tue, Jun 25, 2019 at 11:05 AM Jason Gustafson <jason@confluent.io>
> > > wrote:
> > >
> > >> >
> > >> > I think co-locating does have some merits here, i.e. letting the
> > >> > ConsumerCoordinator which has the source-of-truth of assignment to
> act
> > >> as
> > >> > the TxnCoordinator as well; but I agree there's also some cons of
> > >> coupling
> > >> > them together. I'm still a bit inclining towards colocation but if
> there
> > >> > are good rationales not to do so I can be convinced as well.
> > >>
> > >>
> > >> The good rationale is that we have no mechanism to colocate
> partitions ;).
> > >> Are you suggesting we store the group and transaction state in the
> same
> > >> log? Can you be more concrete about the benefit?
> > >>
> > >> -Jason
> > >>
> > >> On Tue, Jun 25, 2019 at 10:51 AM Guozhang Wang <wangguoz@gmail.com>
> > >> wrote:
> > >>
> > >> > Hi Boyang,
> > >> >
> > >> > 1. One advantage of retry against on-hold is that it will not
> tie-up a
> > >> > handler thread (of course the latter could do the same but that
> involves
> > >> > using a purgatory which is more complicated), and also it is less
> > >> likely to
> > >> > violate request timeout. So I think there are some rationales to
> prefer
> > >> > retries.
> > >> >
> > >>
> > >  That sounds fair to me, also we are avoiding usage of another
> purgatory
> > > instance. Usually for one back-off
> > > we are only delaying 50ms during startup which is trivial cost. This
> > > behavior shouldn't be changed.
> > >
> > > > 2. Regarding "ConsumerRebalanceListener": both
> ConsumerRebalanceListener
> > >> > and PartitionAssignors are user-customizable modules, and only
> > >> difference
> > >> > is that the former is specified via code and the latter is
> specified via
> > >> > config.
> > >> >
> > >> > Regarding Jason's proposal of ConsumerAssignment, one thing to note
> > >> though
> > >> > with KIP-429 the onPartitionAssigned may not be called if the
> assignment
> > >> > does not change, whereas onAssignment would always be called at the
> end
> > >> of
> > >> > sync-group response. My proposed semantics is that
> > >> > `RebalanceListener#onPartitionsXXX` are used for notifications to
> user,
> > >> and
> > >> > hence if there's no changes these will not be called, whereas
> > >> > `PartitionAssignor` is used for assignor logic, whose callback would
> > >> always
> > >> > be called no matter if the partitions have changed or not.
> > >>
> > >> I think a third option is to gracefully expose generation id as part
> of
> > > consumer API, so that we don't need to
> > > bother overloading various callbacks. Of course, this builds upon the
> > > assumption that topic partitions
> > > will not be included in new initTransaction API.
> > >
> > > > 3. I feel it is a bit awkward to let the TxnCoordinator keeping
> partition
> > >> > assignments since it is sort of taking over the job of the
> > >> > ConsumerCoordinator, and may likely cause a split-brain problem as
> two
> > >> > coordinators keep a copy of this assignment which may be different.
> > >> >
> > >> > I think co-locating does have some merits here, i.e. letting the
> > >> > ConsumerCoordinator which has the source-of-truth of assignment to
> act
> > >> as
> > >> > the TxnCoordinator as well; but I agree there's also some cons of
> > >> coupling
> > >> > them together. I'm still a bit inclining towards colocation but if
> there
> > >> > are good rationales not to do so I can be convinced as well.
> > >> >
> > >>
> > > The purpose of co-location is to let txn coordinator see the group
> > > assignment. This priority is weakened
> > > when we already have defense on the consumer offset fetch, so I guess
> it's
> > > not super important anymore.
> > >
> > >
> > >> > 4. I guess I'm preferring the philosophy of "only add configs if
> > >> there's no
> > >> > other ways", since more and more configs would make it less and less
> > >> > intuitive out of the box to use.
> > >> >
> > >> > I think it's a valid point that checks upon starting up does not
> cope
> > >> with
> > >> > brokers downgrading but even with a config, but it is still hard for
> > >> users
> > >> > to determine when they can be ensured the broker would never
> downgrade
> > >> > anymore and hence can safely switch the config. So my feeling is
> that
> > >> this
> > >> > config would not be helping too much still. If we want to be at the
> > >> safer
> > >> > side, then I'd suggest we modify the Coordinator -> NetworkClient
> > >> hierarchy
> > >> > to allow the NetworkClient being able to pass the APIVersion
> metadata to
> > >> > Coordinator, so that Coordinator can rely on that logic to change
> its
> > >> > behavior dynamically.
> > >>
> > > The stream thread init could not be supported by a client coordinator
> > > behavior change on the fly,
> > > we are only losing possibilities after we initialized. (main thread
> gets
> > > exit and no thread has global picture anymore)
> > > If we do want to support auto version detection, admin client request
> in
> > > this sense shall be easier.
> > >
> > >
> > >> >
> > >> > 5. I do not have a concrete idea about how the impact on Connect
> would
> > >> > make, maybe Randall or Konstantine can help here?
> > >> >
> > >>
> > > Sounds good, let's see their thoughts.
> > >
> > >
> > >> > Guozhang
> > >> >
> > >> > On Mon, Jun 24, 2019 at 10:26 PM Boyang Chen <
> > >> reluctanthero104@gmail.com>
> > >> > wrote:
> > >> >
> > >> > > Hey Jason,
> > >> > >
> > >> > > thank you for the proposal here. Some of my thoughts below.
> > >> > >
> > >> > > On Mon, Jun 24, 2019 at 8:58 PM Jason Gustafson <
> jason@confluent.io>
> > >> > > wrote:
> > >> > >
> > >> > > > Hi Boyang,
> > >> > > >
> > >> > > > Thanks for picking this up! Still reading through the updates,
> but
> > >> here
> > >> > > are
> > >> > > > a couple initial comments on the APIs:
> > >> > > >
> > >> > > > 1. The `TxnProducerIdentity` class is a bit awkward. I think
we
> are
> > >> > > trying
> > >> > > > to encapsulate state from the current group assignment.
Maybe
> > >> something
> > >> > > > like `ConsumerAssignment` would be clearer? If we make the
usage
> > >> > > consistent
> > >> > > > across the consumer and producer, then we can avoid exposing
> > >> internal
> > >> > > state
> > >> > > > like the generationId.
> > >> > > >
> > >> > > > For example:
> > >> > > >
> > >> > > > // Public API
> > >> > > > interface ConsumerAssignment {
> > >> > > >   Set<TopicPartition> partittions();
> > >> > > > }
> > >> > > >
> > >> > > > // Not a public API
> > >> > > > class InternalConsumerAssignment implements ConsumerAssignment
{
> > >> > > >   Set<TopicPartition> partittions;
> > >> > > >   int generationId;
> > >> > > > }
> > >> > > >
> > >> > > > Then we can change the rebalance listener to something like
> this:
> > >> > > > onPartitionsAssigned(ConsumerAssignment assignment)
> > >> > > >
> > >> > > > And on the producer:
> > >> > > > void initTransactions(String groupId, ConsumerAssignment
> > >> assignment);
> > >> > > >
> > >> > > > 2. Another bit of awkwardness is the fact that we have to
pass
> the
> > >> > > groupId
> > >> > > > through both initTransactions() and sendOffsetsToTransaction().
> We
> > >> > could
> > >> > > > consider a config instead. Maybe something like `
> > >> > transactional.group.id
> > >> > > `?
> > >> > > > Then we could simplify the producer APIs, potentially even
> > >> deprecating
> > >> > > the
> > >> > > > current sendOffsetsToTransaction. In fact, for this new
usage,
> the `
> > >> > > > transational.id` config is not needed. It would be nice
if we
> don't
> > >> > have
> > >> > > > to
> > >> > > > provide it.
> > >> > > >
> > >> > > > I like the idea of combining 1 and 2. We could definitely
pass
> in a
> > >> > > group.id config
> > >> > > so that we could avoid exposing that information in a public
API.
> The
> > >> > > question I have
> > >> > > is that whether we should name the interface `GroupAssignment`
> > >> instead,
> > >> > so
> > >> > > that Connect later
> > >> > > could also extend on the same interface, just to echo Guozhang's
> point
> > >> > > here, Also the base interface
> > >> > > is better to be defined empty for easy extension, or define an
> > >> abstract
> > >> > > type called `Resource` to be shareable
> > >> > > later IMHO.
> > >> > >
> > >> > >
> > >> > > > By the way, I'm a bit confused about discussion above about
> > >> colocating
> > >> > > the
> > >> > > > txn and group coordinators. That is not actually necessary,
is
> it?
> > >> > > >
> > >> > > > Yes, this is not a requirement for this KIP, because it
is
> > >> inherently
> > >> > > impossible to
> > >> > > achieve co-locating  topic partition of transaction log and
> consumed
> > >> > offset
> > >> > > topics.
> > >> > >
> > >> > >
> > >> > > > Thanks,
> > >> > > > Jason
> > >> > > >
> > >> > > On Mon, Jun 24, 2019 at 10:07 AM Boyang Chen <
> > >> reluctanthero104@gmail.com
> > >> > >
> > >> > > > wrote:
> > >> > > >
> > >> > > > > Thank you Ismael for the suggestion. We will attempt
to
> address
> > >> it by
> > >> > > > > giving more details to rejected alternative section.
> > >> > > > >
> > >> > > > >
> > >> > > > > Thank you for the comment Guozhang! Answers are inline
below.
> > >> > > > >
> > >> > > > >
> > >> > > > >
> > >> > > > > On Sun, Jun 23, 2019 at 6:33 PM Guozhang Wang <
> wangguoz@gmail.com
> > >> >
> > >> > > > wrote:
> > >> > > > >
> > >> > > > > > Hello Boyang,
> > >> > > > > >
> > >> > > > > > Thanks for the KIP, I have some comments below:
> > >> > > > > >
> > >> > > > > > 1. "Once transactions are complete, the call will
return."
> This
> > >> > seems
> > >> > > > > > different from the existing behavior, in which
we would
> return a
> > >> > > > > retriable
> > >> > > > > > CONCURRENT_TRANSACTIONS and let the client to
retry, is this
> > >> > > > intentional?
> > >> > > > > >
> > >> > > > >
> > >> > > > > I don’t think it is intentional, and I will defer
this
> question to
> > >> > > Jason
> > >> > > > > when he got time to answer since from what I understood
retry
> and
> > >> on
> > >> > > hold
> > >> > > > > seem both valid approaches.
> > >> > > > >
> > >> > > > >
> > >> > > > > > 2. "an overload to onPartitionsAssigned in the
consumer's
> > >> rebalance
> > >> > > > > > listener interface": as part of KIP-341 we've
already add
> this
> > >> > > > > information
> > >> > > > > > to the onAssignment callback. Would this be sufficient?
Or
> more
> > >> > > > generally
> > >> > > > > > speaking, which information have to be passed
around in
> > >> rebalance
> > >> > > > > callback
> > >> > > > > > while others can be passed around in PartitionAssignor
> > >> callback? In
> > >> > > > > Streams
> > >> > > > > > for example both callbacks are used but most critical
> > >> information
> > >> > is
> > >> > > > > passed
> > >> > > > > > via onAssignment.
> > >> > > > > >
> > >> > > > >
> > >> > > > > We still need to extend ConsumerRebalanceListener because
> it’s the
> > >> > > > > interface we could have public access to. The #onAssignment
> call
> > >> is
> > >> > > > defined
> > >> > > > > on PartitionAssignor level which is not easy to work
with
> external
> > >> > > > > producers.
> > >> > > > >
> > >> > > > >
> > >> > > > > > 3. "We propose to use a separate record type in
order to
> store
> > >> the
> > >> > > > group
> > >> > > > > > assignment.": hmm, I thought with the third typed
> > >> FindCoordinator,
> > >> > > the
> > >> > > > > same
> > >> > > > > > broker that act as the  consumer coordinator would
always be
> > >> > selected
> > >> > > > as
> > >> > > > > > the txn coordinator, in which case it can access
its local
> cache
> > >> > > > > metadata /
> > >> > > > > > offset topic to get this information already?
We just need
> to
> > >> think
> > >> > > > about
> > >> > > > > > how to make these two modules directly exchange
information
> > >> without
> > >> > > > > messing
> > >> > > > > > up the code hierarchy.
> > >> > > > > >
> > >> > > > >
> > >> > > > > These two coordinators will be on the same broker only
when
> > >> number of
> > >> > > > > partitions for transaction state topic and consumer
offset
> topic
> > >> are
> > >> > > the
> > >> > > > > same. This normally holds true, but I'm afraid
> > >> > > > > we couldn't make this assumption?
> > >> > > > >
> > >> > > > > 4. The config of "CONSUMER_GROUP_AWARE_TRANSACTION":
it seems
> the
> > >> > goal
> > >> > > of
> > >> > > > > > this config is just to avoid old-versioned broker
to not be
> > >> able to
> > >> > > > > > recognize newer versioned client. I think if we
can do
> something
> > >> > else
> > >> > > > to
> > >> > > > > > avoid this config though, for example we can use
the
> embedded
> > >> > > > AdminClient
> > >> > > > > > to send the APIVersion request upon starting up,
and based
> on
> > >> the
> > >> > > > > returned
> > >> > > > > > value decides whether to go to the old code path
or the new
> > >> > behavior.
> > >> > > > > > Admittedly asking a random broker about APIVersion
does not
> > >> > guarantee
> > >> > > > the
> > >> > > > > > whole cluster's versions, but what we can do is
to first 1)
> find
> > >> > the
> > >> > > > > > coordinator (and if the random broker does not
even
> recognize
> > >> the
> > >> > new
> > >> > > > > > discover type, fall back to old path directly),
and then 2)
> ask
> > >> the
> > >> > > > > > discovered coordinator about its supported APIVersion.
> > >> > > > > >
> > >> > > > >
> > >> > > > > The caveat here is that we have to make sure both the
group
> > >> > coordinator
> > >> > > > and
> > >> > > > > transaction coordinator are on the latest version during
init
> > >> stage.
> > >> > > This
> > >> > > > > is potentially doable as we only need a consumer group.id
> > >> > > > > to check that. In the meantime, a hard-coded config
is still a
> > >> > > favorable
> > >> > > > > backup in case the server has downgraded, so you will
want to
> use
> > >> a
> > >> > new
> > >> > > > > version client without `consumer group` transactional
support.
> > >> > > > >
> > >> > > > > 5. This is a meta question: have you considered how
this can
> be
> > >> > applied
> > >> > > > to
> > >> > > > > > Kafka Connect as well? For example, for source
connectors,
> the
> > >> > > > assignment
> > >> > > > > > is not by "partitions", but by some other sort
of
> "resources"
> > >> based
> > >> > > on
> > >> > > > > the
> > >> > > > > > source systems, how KIP-447 would affect Kafka
Connectors
> that
> > >> > > > > implemented
> > >> > > > > > EOS as well?
> > >> > > > > >
> > >> > > > >
> > >> > > > > No, it's not currently included in the scope. Could
you point
> me
> > >> to a
> > >> > > > > sample source connector who uses EOS? Could always
piggy-back
> into
> > >> > the
> > >> > > > > TxnProducerIdentity struct with more information such
as
> tasks. If
> > >> > > > > this is something to support in near term, an abstract
type
> called
> > >> > > > > "Resource" could be provided and let topic partition
and
> connect
> > >> task
> > >> > > > > implement it.
> > >> > > > >
> > >> > > > >
> > >> > > > > >
> > >> > > > > > Guozhang
> > >> > > > > >
> > >> > > > > >
> > >> > > > > > On Sat, Jun 22, 2019 at 8:40 PM Ismael Juma <
> ismael@juma.me.uk>
> > >> > > wrote:
> > >> > > > > >
> > >> > > > > > > Hi Boyang,
> > >> > > > > > >
> > >> > > > > > > Thanks for the KIP. It's good that we listed
a number of
> > >> rejected
> > >> > > > > > > alternatives. It would be helpful to have
an explanation
> of
> > >> why
> > >> > > they
> > >> > > > > were
> > >> > > > > > > rejected.
> > >> > > > > > >
> > >> > > > > > > Ismael
> > >> > > > > > >
> > >> > > > > > > On Sat, Jun 22, 2019 at 8:31 PM Boyang Chen
<
> > >> bchen11@outlook.com
> > >> > >
> > >> > > > > wrote:
> > >> > > > > > >
> > >> > > > > > > > Hey all,
> > >> > > > > > > >
> > >> > > > > > > > I would like to start a discussion for
KIP-447:
> > >> > > > > > > >
> > >> > > > > > > >
> > >> > > > > > >
> > >> > > > > >
> > >> > > > >
> > >> > > >
> > >> > >
> > >> >
> > >>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-447%3A+Producer+scalability+for+exactly+once+semantics
> > >> > > > > > > >
> > >> > > > > > > > this is a work originated by Jason Gustafson
and we
> would
> > >> like
> > >> > to
> > >> > > > > > proceed
> > >> > > > > > > > into discussion stage.
> > >> > > > > > > >
> > >> > > > > > > > Let me know your thoughts, thanks!
> > >> > > > > > > >
> > >> > > > > > > > Boyang
> > >> > > > > > > >
> > >> > > > > > >
> > >> > > > > >
> > >> > > > > >
> > >> > > > > > --
> > >> > > > > > -- Guozhang
> > >> > > > > >
> > >> > > > >
> > >> > > >
> > >> > >
> > >> >
> > >> >
> > >> > --
> > >> > -- Guozhang
> > >> >
> > >>
> > >
> >
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message