kafka-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Guozhang Wang <wangg...@gmail.com>
Subject Re: [DISCUSS] KIP-447: Producer scalability for exactly once semantics
Date Fri, 02 Aug 2019 23:35:37 GMT
Thanks Boyang,

I've made another pass on KIP-447 as well as
https://github.com/apache/kafka/pull/7078, and have some minor comments
about the proposed API:

1. it seems instead of needing the whole KafkaConsumer object, you'd only
need the "ConsumerGroupMetadata", in that case can we just pass in that
object into the initTxns call?

2. the current trunk already has a public class named (ConsumerGroupMetadata)
under o.a.k.clients.consumer created by KIP-429. If we want to just use
that then maybe it makes less sense to declare a base GroupMetadata as we
are already leaking such information on the assignor anyways.


Guozhang

On Tue, Jul 30, 2019 at 1:55 PM Boyang Chen <reluctanthero104@gmail.com>
wrote:

> Thank you Guozhang for the reply. We will consider the interface change
> from 429 as a backup plan for 447.
>
> And bumping this thread for more discussion.
>
> On Mon, Jul 22, 2019 at 6:28 PM Guozhang Wang <wangguoz@gmail.com> wrote:
>
> > On Sat, Jul 20, 2019 at 9:50 AM Boyang Chen <reluctanthero104@gmail.com>
> > wrote:
> >
> > > Thank you Guozhang for the suggestion! I would normally prefer naming a
> > > flag corresponding to its functionality. Seems to me `isolation_level`
> > > makes us another hop on information track.
> > >
> > > Fair enough, let's use a separate flag name then :)
> >
> >
> > > As for the generation.id exposure, I'm fine leveraging the new API
> from
> > > 429, but however is that design finalized yet, and whether the API will
> > be
> > > added on the generic Consumer<K, V> interface?
> > >
> > > The current PartitionAssignor is inside `internals` package and in
> > KIP-429
> > we are going to create a new interface out of `internals` to really make
> it
> > public APIs, and as part of that we are refactoring some of its method
> > signatures. I just feel some of the newly introduced classes can be
> reused
> > in your KIP as well, i.e. just for code succinctness, but no semantical
> > indications.
> >
> >
> > > Boyang
> > >
> > > On Fri, Jul 19, 2019 at 3:57 PM Guozhang Wang <wangguoz@gmail.com>
> > wrote:
> > >
> > > > Boyang, thanks for the updated proposal!
> > > >
> > > > 3.a. As Jason mentioned, with EOS enabled we still need to augment
> the
> > > > offset fetch request with a boolean to indicate "give me an retriable
> > > error
> > > > code if there's pending offset, rather than sending me the committed
> > > offset
> > > > immediately". Personally I still feel it is okay to piggy-back on the
> > > > ISOLATION_LEVEL boolean, but I'm also fine with another
> > > `await_transaction`
> > > > boolean if you feel strongly about it.
> > > >
> > > > 10. About the exposure of generation id, there may be some
> refactoring
> > > work
> > > > coming from KIP-429 that can benefit KIP-447 as well since we are
> > > wrapping
> > > > the consumer subscription / assignment data in new classes. Note that
> > > > current proposal does not `generationId` since with the cooperative
> > > sticky
> > > > assignor we think it is not necessary for correctness, but also if we
> > > agree
> > > > it is okay to expose it we can potentially include it in
> > > > `ConsumerAssignmentData` as well.
> > > >
> > > >
> > > > Guozhang
> > > >
> > > >
> > > > On Thu, Jul 18, 2019 at 3:55 PM Boyang Chen <
> > reluctanthero104@gmail.com>
> > > > wrote:
> > > >
> > > > > Thank you Jason for the ideas.
> > > > >
> > > > > On Mon, Jul 15, 2019 at 5:28 PM Jason Gustafson <
> jason@confluent.io>
> > > > > wrote:
> > > > >
> > > > > > Hi Boyang,
> > > > > >
> > > > > > Thanks for the updates. A few comments below:
> > > > > >
> > > > > > 1. The KIP mentions that `transaction.timeout.ms` should be
> > reduced
> > > to
> > > > > > 10s.
> > > > > > I think this makes sense for Kafka Streams which is tied to the
> > > > consumer
> > > > > > group semantics and uses a default 10s session timeout. However,
> it
> > > > > seems a
> > > > > > bit dangerous to make this change for the producer generally.
> Could
> > > we
> > > > > just
> > > > > > change it for streams?
> > > > > >
> > > > > > That sounds good to me.
> > > > >
> > > > > > 2. The new `initTransactions` API takes a `Consumer` instance. I
> > > think
> > > > > the
> > > > > > idea is to basically put in a backdoor to give the producer
> access
> > to
> > > > the
> > > > > > group generationId. It's not clear to me how this would work
> given
> > > > > package
> > > > > > restrictions. I wonder if it would be better to just expose the
> > state
> > > > we
> > > > > > need from the consumer. I know we have been reluctant to do this
> so
> > > far
> > > > > > because we treat the generationId as an implementation detail.
> > > > However, I
> > > > > > think we might just bite the bullet and expose it rather than
> > coming
> > > up
> > > > > > with a messy hack. Concepts such as memberIds have already been
> > > exposed
> > > > > in
> > > > > > the AdminClient, so maybe it is not too bad. Alternatively, we
> > could
> > > > use
> > > > > an
> > > > > > opaque type. For example:
> > > > > >
> > > > > > // public
> > > > > > interface GroupMetadata {}
> > > > > >
> > > > > > // private
> > > > > > interface ConsumerGroupMetadata {
> > > > > >   final int generationId;
> > > > > >   final String memberId;
> > > > > > }
> > > > > >
> > > > > > // Consumer API
> > > > > > public GroupMetadata groupMetadata();
> > > > > >
> > > > > > I am probably leaning toward just exposing the state we need.
> > > > > >
> > > > > > Yes, also to mention that Kafka Streams use generic Cosnumer API
> > > which
> > > > > doesn't have rich
> > > > > states like a full `KafkaConsumer`. The hack will not work as
> > expected.
> > > > >
> > > > > Instead, just exposing the consumer generation.id seems a way
> easier
> > > > work.
> > > > > We could consolidate
> > > > > the API and make it
> > > > >
> > > > > 3. Given that we are already providing a way to propagate group
> state
> > > > from
> > > > > > the consumer to the producer, I wonder if we may as well include
> > the
> > > > > > memberId and groupInstanceId. This would make the validation we
> do
> > > for
> > > > > > TxnOffsetCommit consistent with OffsetCommit. If for no other
> > > benefit,
> > > > at
> > > > > > least this may help with debugging.
> > > > > >
> > > > >
> > > > > Yes, we could put them into the GroupMetadata struct.
> > > > >
> > > > >
> > > > > > 4. I like the addition of isolation_level to the offset fetch. At
> > the
> > > > > same
> > > > > > time, its behavior is a bit inconsistent with how it is used in
> the
> > > > > > consumer generally. There is no reason for the group coordinator
> to
> > > > ever
> > > > > > expose aborted data, so this is mostly about awaiting pending
> > offset
> > > > > > commits, not reading uncommitted data. Perhaps instead of calling
> > > this
> > > > > > "isolation level," it should be more like
> > "await_pending_transaction"
> > > > or
> > > > > > something like that?
> > > > > >
> > > > > > Also, just to be clear, the consumer would treat this as an
> > optional
> > > > > field,
> > > > > > right? So if the broker does not support the latest OffsetFetch
> > API,
> > > it
> > > > > > would silently revert to reading the old data. Basically it would
> > be
> > > up
> > > > > to
> > > > > > the streams version probing logic to ensure that the expectation
> on
> > > > this
> > > > > > API fits with the usage of `transctional.id`.
> > > > > >
> > > > > > Sounds like a better naming to me, while I think it could be
> > > shortened
> > > > to
> > > > > `await_transaction`.
> > > > > I think the field should be optional, too.
> > > > >
> > > > >
> > > > > > Thanks,
> > > > > > Jason
> > > > > >
> > > > > >
> > > > > >
> > > > > >
> > > > > >
> > > > > > On Mon, Jul 8, 2019 at 3:19 PM Boyang Chen <
> > > reluctanthero104@gmail.com
> > > > >
> > > > > > wrote:
> > > > > >
> > > > > > > Hey Guozhang,
> > > > > > >
> > > > > > > I will correct my statement from last email. I don't think the
> > > > > > > read_committed (3.a) is necessary to be added to the
> OffsetFetch
> > > > > request,
> > > > > > > as if we are using EOS application, the underlying consumers
> > within
> > > > the
> > > > > > > group should always back off when there is pending offsets.
> > > > > > >
> > > > > > > Let me know if you think this is correct.
> > > > > > >
> > > > > > > On Tue, Jul 2, 2019 at 3:21 PM Boyang Chen <
> > > > reluctanthero104@gmail.com
> > > > > >
> > > > > > > wrote:
> > > > > > >
> > > > > > > > Thank you Guozhang for the questions, inline answers are
> below.
> > > > > > > >
> > > > > > > > On Tue, Jul 2, 2019 at 3:14 PM Boyang Chen <
> > > > > reluctanthero104@gmail.com
> > > > > > >
> > > > > > > > wrote:
> > > > > > > >
> > > > > > > >> Hey all,
> > > > > > > >>
> > > > > > > >> I have done a fundamental polish of KIP-447
> > > > > > > >> <
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-447%3A+Producer+scalability+for+exactly+once+semantics
> > > > > > >
> > > > > > > and
> > > > > > > >> written a design doc
> > > > > > > >> <
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://docs.google.com/document/d/1LhzHGeX7_Lay4xvrEXxfciuDWATjpUXQhrEIkph9qRE/edit#
> > > > > > >
> > > > > > > depicting
> > > > > > > >> internal changes. We stripped off many implementation
> details
> > > from
> > > > > the
> > > > > > > KIP,
> > > > > > > >> and simplified the public changes by a lot. For reviewers,
> it
> > is
> > > > > > highly
> > > > > > > >> recommended to fully understand EOS design in KIP-98 and
> read
> > > its
> > > > > > > >> corresponding design doc
> > > > > > > >> <
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://docs.google.com/document/d/11Jqy_GjUGtdXJK94XGsEIK7CP1SnQGdp2eF0wSw9ra8/edit
> > > > > > >
> > > > > > > if
> > > > > > > >> you haven't done so already.
> > > > > > > >>
> > > > > > > >> Let me know if you found anything confusing around the KIP
> or
> > > the
> > > > > > > design.
> > > > > > > >> Would be happy to discuss in depth.
> > > > > > > >>
> > > > > > > >> Best,
> > > > > > > >> Boyang
> > > > > > > >>
> > > > > > > >>
> > > > > > > >>
> > > > > > > >>
> > > > > > > >>
> > > > > > > >> On Wed, Jun 26, 2019 at 11:00 AM Guozhang Wang <
> > > > wangguoz@gmail.com>
> > > > > > > >> wrote:
> > > > > > > >>
> > > > > > > >>> 2. The reason we did not expose generation.id from
> > > KafkaConsumer
> > > > > > > public
> > > > > > > >>> APIs directly is to abstract this notion from users (since
> it
> > > is
> > > > an
> > > > > > > >>> implementation detail of the rebalance protocol itself,
> e.g.
> > if
> > > > > user
> > > > > > > >>> calls
> > > > > > > >>> consumer.assign() they do not need to invoke
> > > ConsumerCoordinator
> > > > > and
> > > > > > no
> > > > > > > >>> need to be aware of generation.id at all).
> > > > > > > >>>
> > > > > > > >>> On the other hand, with the current proposal the
> > > txn.coordiantor
> > > > > did
> > > > > > > not
> > > > > > > >>> know about the latest generation from the source-of-truth
> > > > > > > >>> group.coordinator; instead, it will only bump up the
> > generation
> > > > > from
> > > > > > > the
> > > > > > > >>> producer's InitProducerIdRequest only.
> > > > > > > >>>
> > > > > > > >>> The key here is that GroupCoordinator, when handling
> > > > > > > >>> `InitProducerIdRequest
> > > > > > > >>>
> > > > > > > >> In the new design, we just pass the entire consumer instance
> > > into
> > > > > the
> > > > > > > > producer through
> > > > > > > > #initTransaction, so no public API will be created.
> > > > > > > >
> > > > > > > >> 3. I agree that if we rely on the group coordinator to block
> > on
> > > > > > > returning
> > > > > > > >>> offset-fetch-response if read-committed is enabled, then we
> > do
> > > > not
> > > > > > need
> > > > > > > >>> to
> > > > > > > >>> store partition assignment on txn coordinator and therefore
> > > it's
> > > > > > better
> > > > > > > >>> to
> > > > > > > >>> still decouple them. For that case we still need to update
> > the
> > > > KIP
> > > > > > wiki
> > > > > > > >>> page that includes:
> > > > > > > >>>
> > > > > > > >>> 3.a. Augment OffsetFetchRequest with the ISOLATION_LEVEL as
> > > well.
> > > > > > > >>> 3.b. Add new error code in OffsetFetchResponse to let
> client
> > > > > backoff
> > > > > > > and
> > > > > > > >>> retry if there are pending txns including the interested
> > > > > partitions.
> > > > > > > >>> 3.c. Also in the worst case we would let the client be
> > blocked
> > > > for
> > > > > > the
> > > > > > > >>> txn.timeout period, and for that rationale we may need to
> > > > consider
> > > > > > > >>> reducing
> > > > > > > >>> our default txn.timeout value as well.
> > > > > > > >>>
> > > > > > > >>> Addressed 3.b and 3.c, will do 3.a.
> > > > > > > >
> > > > > > > >> 4. According to Colin it seems we do not need to create
> > another
> > > > KIP
> > > > > > and
> > > > > > > we
> > > > > > > >>> can just complete it as part of KIP-117 / KAFKA-5214; and
> we
> > > need
> > > > > to
> > > > > > do
> > > > > > > >>> some cleanup to have BrokerApiVersion exposed from
> > AdminClient
> > > > > > (@Colin
> > > > > > > >>> please let use know if you have any concerns exposing it).
> > > > > > > >>>
> > > > > > > >> I think we no longer need to rely on api version for
> > > > initialization,
> > > > > > > > since we will be using the upgrade.from config anyway.
> > > > > > > >
> > > > > > > >>
> > > > > > > >>> Guozhang
> > > > > > > >>>
> > > > > > > >>>
> > > > > > > >>> On Tue, Jun 25, 2019 at 6:43 PM Jason Gustafson <
> > > > > jason@confluent.io>
> > > > > > > >>> wrote:
> > > > > > > >>>
> > > > > > > >>> > For reference, we have BrokerApiVersionCommand already
> as a
> > > > > public
> > > > > > > >>> > interface. We have a bit of tech debt at the moment
> because
> > > it
> > > > > > uses a
> > > > > > > >>> > custom AdminClient. It would be nice to clean that up. In
> > > > > general,
> > > > > > I
> > > > > > > >>> think
> > > > > > > >>> > it is reasonable to expose from AdminClient. It can be
> used
> > > by
> > > > > > > >>> management
> > > > > > > >>> > tools to inspect running Kafka versions for example.
> > > > > > > >>> >
> > > > > > > >>> > -Jason
> > > > > > > >>> >
> > > > > > > >>> > On Tue, Jun 25, 2019 at 4:37 PM Boyang Chen <
> > > > > > > >>> reluctanthero104@gmail.com>
> > > > > > > >>> > wrote:
> > > > > > > >>> >
> > > > > > > >>> > > Thank you for the context Colin. The groupId was
> indeed a
> > > > > > > copy-paste
> > > > > > > >>> > error.
> > > > > > > >>> > > Our use case here for 447 is (Quoted from Guozhang):
> > > > > > > >>> > > '''
> > > > > > > >>> > > I think if we can do something else to
> > > > > > > >>> > > avoid this config though, for example we can use the
> > > embedded
> > > > > > > >>> AdminClient
> > > > > > > >>> > > to send the APIVersion request upon starting up, and
> > based
> > > on
> > > > > the
> > > > > > > >>> > returned
> > > > > > > >>> > > value decides whether to go to the old code path or the
> > new
> > > > > > > behavior.
> > > > > > > >>> > > '''
> > > > > > > >>> > > The benefit we get is to avoid adding a new
> configuration
> > > to
> > > > > > make a
> > > > > > > >>> > > decision simply base on broker version. If you have
> > > concerns
> > > > > with
> > > > > > > >>> > exposing
> > > > > > > >>> > > ApiVersion for client, we could
> > > > > > > >>> > > try to think of alternative solutions too.
> > > > > > > >>> > >
> > > > > > > >>> > > Boyang
> > > > > > > >>> > >
> > > > > > > >>> > >
> > > > > > > >>> > >
> > > > > > > >>> > > On Tue, Jun 25, 2019 at 4:20 PM Colin McCabe <
> > > > > cmccabe@apache.org
> > > > > > >
> > > > > > > >>> wrote:
> > > > > > > >>> > >
> > > > > > > >>> > > > kafka.api.ApiVersion is an internal class, not
> suitable
> > > to
> > > > > > > exposing
> > > > > > > >>> > > > through AdminClient.  That class is not even
> accessible
> > > > > without
> > > > > > > >>> having
> > > > > > > >>> > > the
> > > > > > > >>> > > > broker jars on your CLASSPATH.
> > > > > > > >>> > > >
> > > > > > > >>> > > > Another question is, what is the groupId parameter
> > doing
> > > in
> > > > > the
> > > > > > > >>> call?
> > > > > > > >>> > > The
> > > > > > > >>> > > > API versions are the same no matter what consumer
> group
> > > we
> > > > > use,
> > > > > > > >>> right?
> > > > > > > >>> > > > Perhaps this was a copy and paste error?
> > > > > > > >>> > > >
> > > > > > > >>> > > > This is not the first time we have discussed having a
> > > > method
> > > > > in
> > > > > > > >>> > > > AdminClient to retrieve API version information.  In
> > > fact,
> > > > > the
> > > > > > > >>> original
> > > > > > > >>> > > KIP
> > > > > > > >>> > > > which created KafkaAdminClient specified an API for
> > > > fetching
> > > > > > > >>> version
> > > > > > > >>> > > > information.  It was called apiVersions and it is
> still
> > > > there
> > > > > > on
> > > > > > > >>> the
> > > > > > > >>> > > wiki.
> > > > > > > >>> > > > See
> > > > > > > >>> > > >
> > > > > > > >>> > >
> > > > > > > >>> >
> > > > > > > >>>
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-117%3A+Add+a+public+AdminClient+API+for+Kafka+admin+operations
> > > > > > > >>> > > >
> > > > > > > >>> > > > However, this API wasn't ready in time for 0.11.0 so
> we
> > > > > shipped
> > > > > > > >>> without
> > > > > > > >>> > > > it.  There was a JIRA to implement it for later
> > versions,
> > > > > > > >>> > > > https://issues.apache.org/jira/browse/KAFKA-5214 ,
> as
> > > well
> > > > > as
> > > > > > a
> > > > > > > >>> PR,
> > > > > > > >>> > > > https://github.com/apache/kafka/pull/3012 .
> However,
> > we
> > > > > > started
> > > > > > > >>> to
> > > > > > > >>> > > > rethink whether this AdminClient function was even
> > > > necessary.
> > > > > > > >>> Most of
> > > > > > > >>> > > the
> > > > > > > >>> > > > use-cases we could think of seemed like horrible
> hacks.
> > > So
> > > > > it
> > > > > > > has
> > > > > > > >>> > never
> > > > > > > >>> > > > really been implemented (yet?).
> > > > > > > >>> > > >
> > > > > > > >>> > > > best,
> > > > > > > >>> > > > Colin
> > > > > > > >>> > > >
> > > > > > > >>> > > >
> > > > > > > >>> > > > On Tue, Jun 25, 2019, at 15:46, Boyang Chen wrote:
> > > > > > > >>> > > > > Actually, after a second thought, I think it
> actually
> > > > makes
> > > > > > > >>> sense to
> > > > > > > >>> > > > > support auto upgrade through admin client to help
> use
> > > get
> > > > > api
> > > > > > > >>> version
> > > > > > > >>> > > > > from
> > > > > > > >>> > > > > broker.
> > > > > > > >>> > > > > A draft KIP is here:
> > > > > > > >>> > > > >
> > > > > > > >>> > > >
> > > > > > > >>> > >
> > > > > > > >>> >
> > > > > > > >>>
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-483%3A++Add+Broker+Version+API+in+Admin+Client
> > > > > > > >>> > > > >
> > > > > > > >>> > > > > Boyang
> > > > > > > >>> > > > >
> > > > > > > >>> > > > > On Tue, Jun 25, 2019 at 2:57 PM Boyang Chen <
> > > > > > > >>> > > reluctanthero104@gmail.com>
> > > > > > > >>> > > > > wrote:
> > > > > > > >>> > > > >
> > > > > > > >>> > > > > > Thank you Guozhang, some of my understandings are
> > > > inline
> > > > > > > below.
> > > > > > > >>> > > > > >
> > > > > > > >>> > > > > > On Tue, Jun 25, 2019 at 11:05 AM Jason Gustafson
> <
> > > > > > > >>> > jason@confluent.io
> > > > > > > >>> > > >
> > > > > > > >>> > > > > > wrote:
> > > > > > > >>> > > > > >
> > > > > > > >>> > > > > >> >
> > > > > > > >>> > > > > >> > I think co-locating does have some merits
> here,
> > > i.e.
> > > > > > > >>> letting the
> > > > > > > >>> > > > > >> > ConsumerCoordinator which has the
> > source-of-truth
> > > of
> > > > > > > >>> assignment
> > > > > > > >>> > to
> > > > > > > >>> > > > act
> > > > > > > >>> > > > > >> as
> > > > > > > >>> > > > > >> > the TxnCoordinator as well; but I agree
> there's
> > > also
> > > > > > some
> > > > > > > >>> cons
> > > > > > > >>> > of
> > > > > > > >>> > > > > >> coupling
> > > > > > > >>> > > > > >> > them together. I'm still a bit inclining
> towards
> > > > > > > colocation
> > > > > > > >>> but
> > > > > > > >>> > if
> > > > > > > >>> > > > there
> > > > > > > >>> > > > > >> > are good rationales not to do so I can be
> > > convinced
> > > > as
> > > > > > > well.
> > > > > > > >>> > > > > >>
> > > > > > > >>> > > > > >>
> > > > > > > >>> > > > > >> The good rationale is that we have no mechanism
> to
> > > > > > colocate
> > > > > > > >>> > > > partitions ;).
> > > > > > > >>> > > > > >> Are you suggesting we store the group and
> > > transaction
> > > > > > state
> > > > > > > >>> in the
> > > > > > > >>> > > > same
> > > > > > > >>> > > > > >> log? Can you be more concrete about the benefit?
> > > > > > > >>> > > > > >>
> > > > > > > >>> > > > > >> -Jason
> > > > > > > >>> > > > > >>
> > > > > > > >>> > > > > >> On Tue, Jun 25, 2019 at 10:51 AM Guozhang Wang <
> > > > > > > >>> > wangguoz@gmail.com>
> > > > > > > >>> > > > > >> wrote:
> > > > > > > >>> > > > > >>
> > > > > > > >>> > > > > >> > Hi Boyang,
> > > > > > > >>> > > > > >> >
> > > > > > > >>> > > > > >> > 1. One advantage of retry against on-hold is
> > that
> > > it
> > > > > > will
> > > > > > > >>> not
> > > > > > > >>> > > > tie-up a
> > > > > > > >>> > > > > >> > handler thread (of course the latter could do
> > the
> > > > same
> > > > > > but
> > > > > > > >>> that
> > > > > > > >>> > > > involves
> > > > > > > >>> > > > > >> > using a purgatory which is more complicated),
> > and
> > > > also
> > > > > > it
> > > > > > > is
> > > > > > > >>> > less
> > > > > > > >>> > > > > >> likely to
> > > > > > > >>> > > > > >> > violate request timeout. So I think there are
> > some
> > > > > > > >>> rationales to
> > > > > > > >>> > > > prefer
> > > > > > > >>> > > > > >> > retries.
> > > > > > > >>> > > > > >> >
> > > > > > > >>> > > > > >>
> > > > > > > >>> > > > > >  That sounds fair to me, also we are avoiding
> usage
> > > of
> > > > > > > another
> > > > > > > >>> > > > purgatory
> > > > > > > >>> > > > > > instance. Usually for one back-off
> > > > > > > >>> > > > > > we are only delaying 50ms during startup which is
> > > > trivial
> > > > > > > cost.
> > > > > > > >>> > This
> > > > > > > >>> > > > > > behavior shouldn't be changed.
> > > > > > > >>> > > > > >
> > > > > > > >>> > > > > > > 2. Regarding "ConsumerRebalanceListener": both
> > > > > > > >>> > > > ConsumerRebalanceListener
> > > > > > > >>> > > > > >> > and PartitionAssignors are user-customizable
> > > > modules,
> > > > > > and
> > > > > > > >>> only
> > > > > > > >>> > > > > >> difference
> > > > > > > >>> > > > > >> > is that the former is specified via code and
> the
> > > > > latter
> > > > > > is
> > > > > > > >>> > > > specified via
> > > > > > > >>> > > > > >> > config.
> > > > > > > >>> > > > > >> >
> > > > > > > >>> > > > > >> > Regarding Jason's proposal of
> > ConsumerAssignment,
> > > > one
> > > > > > > thing
> > > > > > > >>> to
> > > > > > > >>> > > note
> > > > > > > >>> > > > > >> though
> > > > > > > >>> > > > > >> > with KIP-429 the onPartitionAssigned may not
> be
> > > > called
> > > > > > if
> > > > > > > >>> the
> > > > > > > >>> > > > assignment
> > > > > > > >>> > > > > >> > does not change, whereas onAssignment would
> > always
> > > > be
> > > > > > > >>> called at
> > > > > > > >>> > > the
> > > > > > > >>> > > > end
> > > > > > > >>> > > > > >> of
> > > > > > > >>> > > > > >> > sync-group response. My proposed semantics is
> > that
> > > > > > > >>> > > > > >> > `RebalanceListener#onPartitionsXXX` are used
> for
> > > > > > > >>> notifications
> > > > > > > >>> > to
> > > > > > > >>> > > > user,
> > > > > > > >>> > > > > >> and
> > > > > > > >>> > > > > >> > hence if there's no changes these will not be
> > > > called,
> > > > > > > >>> whereas
> > > > > > > >>> > > > > >> > `PartitionAssignor` is used for assignor
> logic,
> > > > whose
> > > > > > > >>> callback
> > > > > > > >>> > > would
> > > > > > > >>> > > > > >> always
> > > > > > > >>> > > > > >> > be called no matter if the partitions have
> > changed
> > > > or
> > > > > > not.
> > > > > > > >>> > > > > >>
> > > > > > > >>> > > > > >> I think a third option is to gracefully expose
> > > > > generation
> > > > > > id
> > > > > > > >>> as
> > > > > > > >>> > part
> > > > > > > >>> > > > of
> > > > > > > >>> > > > > > consumer API, so that we don't need to
> > > > > > > >>> > > > > > bother overloading various callbacks. Of course,
> > this
> > > > > > builds
> > > > > > > >>> upon
> > > > > > > >>> > the
> > > > > > > >>> > > > > > assumption that topic partitions
> > > > > > > >>> > > > > > will not be included in new initTransaction API.
> > > > > > > >>> > > > > >
> > > > > > > >>> > > > > > > 3. I feel it is a bit awkward to let the
> > > > TxnCoordinator
> > > > > > > >>> keeping
> > > > > > > >>> > > > partition
> > > > > > > >>> > > > > >> > assignments since it is sort of taking over
> the
> > > job
> > > > of
> > > > > > the
> > > > > > > >>> > > > > >> > ConsumerCoordinator, and may likely cause a
> > > > > split-brain
> > > > > > > >>> problem
> > > > > > > >>> > as
> > > > > > > >>> > > > two
> > > > > > > >>> > > > > >> > coordinators keep a copy of this assignment
> > which
> > > > may
> > > > > be
> > > > > > > >>> > > different.
> > > > > > > >>> > > > > >> >
> > > > > > > >>> > > > > >> > I think co-locating does have some merits
> here,
> > > i.e.
> > > > > > > >>> letting the
> > > > > > > >>> > > > > >> > ConsumerCoordinator which has the
> > source-of-truth
> > > of
> > > > > > > >>> assignment
> > > > > > > >>> > to
> > > > > > > >>> > > > act
> > > > > > > >>> > > > > >> as
> > > > > > > >>> > > > > >> > the TxnCoordinator as well; but I agree
> there's
> > > also
> > > > > > some
> > > > > > > >>> cons
> > > > > > > >>> > of
> > > > > > > >>> > > > > >> coupling
> > > > > > > >>> > > > > >> > them together. I'm still a bit inclining
> towards
> > > > > > > colocation
> > > > > > > >>> but
> > > > > > > >>> > if
> > > > > > > >>> > > > there
> > > > > > > >>> > > > > >> > are good rationales not to do so I can be
> > > convinced
> > > > as
> > > > > > > well.
> > > > > > > >>> > > > > >> >
> > > > > > > >>> > > > > >>
> > > > > > > >>> > > > > > The purpose of co-location is to let txn
> > coordinator
> > > > see
> > > > > > the
> > > > > > > >>> group
> > > > > > > >>> > > > > > assignment. This priority is weakened
> > > > > > > >>> > > > > > when we already have defense on the consumer
> offset
> > > > > fetch,
> > > > > > > so I
> > > > > > > >>> > guess
> > > > > > > >>> > > > it's
> > > > > > > >>> > > > > > not super important anymore.
> > > > > > > >>> > > > > >
> > > > > > > >>> > > > > >
> > > > > > > >>> > > > > >> > 4. I guess I'm preferring the philosophy of
> > "only
> > > > add
> > > > > > > >>> configs if
> > > > > > > >>> > > > > >> there's no
> > > > > > > >>> > > > > >> > other ways", since more and more configs would
> > > make
> > > > it
> > > > > > > less
> > > > > > > >>> and
> > > > > > > >>> > > less
> > > > > > > >>> > > > > >> > intuitive out of the box to use.
> > > > > > > >>> > > > > >> >
> > > > > > > >>> > > > > >> > I think it's a valid point that checks upon
> > > starting
> > > > > up
> > > > > > > >>> does not
> > > > > > > >>> > > > cope
> > > > > > > >>> > > > > >> with
> > > > > > > >>> > > > > >> > brokers downgrading but even with a config,
> but
> > it
> > > > is
> > > > > > > still
> > > > > > > >>> hard
> > > > > > > >>> > > for
> > > > > > > >>> > > > > >> users
> > > > > > > >>> > > > > >> > to determine when they can be ensured the
> broker
> > > > would
> > > > > > > never
> > > > > > > >>> > > > downgrade
> > > > > > > >>> > > > > >> > anymore and hence can safely switch the
> config.
> > So
> > > > my
> > > > > > > >>> feeling is
> > > > > > > >>> > > > that
> > > > > > > >>> > > > > >> this
> > > > > > > >>> > > > > >> > config would not be helping too much still. If
> > we
> > > > want
> > > > > > to
> > > > > > > >>> be at
> > > > > > > >>> > > the
> > > > > > > >>> > > > > >> safer
> > > > > > > >>> > > > > >> > side, then I'd suggest we modify the
> Coordinator
> > > ->
> > > > > > > >>> > NetworkClient
> > > > > > > >>> > > > > >> hierarchy
> > > > > > > >>> > > > > >> > to allow the NetworkClient being able to pass
> > the
> > > > > > > APIVersion
> > > > > > > >>> > > > metadata to
> > > > > > > >>> > > > > >> > Coordinator, so that Coordinator can rely on
> > that
> > > > > logic
> > > > > > to
> > > > > > > >>> > change
> > > > > > > >>> > > > its
> > > > > > > >>> > > > > >> > behavior dynamically.
> > > > > > > >>> > > > > >>
> > > > > > > >>> > > > > > The stream thread init could not be supported by
> a
> > > > client
> > > > > > > >>> > coordinator
> > > > > > > >>> > > > > > behavior change on the fly,
> > > > > > > >>> > > > > > we are only losing possibilities after we
> > > initialized.
> > > > > > (main
> > > > > > > >>> thread
> > > > > > > >>> > > > gets
> > > > > > > >>> > > > > > exit and no thread has global picture anymore)
> > > > > > > >>> > > > > > If we do want to support auto version detection,
> > > admin
> > > > > > client
> > > > > > > >>> > request
> > > > > > > >>> > > > in
> > > > > > > >>> > > > > > this sense shall be easier.
> > > > > > > >>> > > > > >
> > > > > > > >>> > > > > >
> > > > > > > >>> > > > > >> >
> > > > > > > >>> > > > > >> > 5. I do not have a concrete idea about how the
> > > > impact
> > > > > on
> > > > > > > >>> Connect
> > > > > > > >>> > > > would
> > > > > > > >>> > > > > >> > make, maybe Randall or Konstantine can help
> > here?
> > > > > > > >>> > > > > >> >
> > > > > > > >>> > > > > >>
> > > > > > > >>> > > > > > Sounds good, let's see their thoughts.
> > > > > > > >>> > > > > >
> > > > > > > >>> > > > > >
> > > > > > > >>> > > > > >> > Guozhang
> > > > > > > >>> > > > > >> >
> > > > > > > >>> > > > > >> > On Mon, Jun 24, 2019 at 10:26 PM Boyang Chen <
> > > > > > > >>> > > > > >> reluctanthero104@gmail.com>
> > > > > > > >>> > > > > >> > wrote:
> > > > > > > >>> > > > > >> >
> > > > > > > >>> > > > > >> > > Hey Jason,
> > > > > > > >>> > > > > >> > >
> > > > > > > >>> > > > > >> > > thank you for the proposal here. Some of my
> > > > thoughts
> > > > > > > >>> below.
> > > > > > > >>> > > > > >> > >
> > > > > > > >>> > > > > >> > > On Mon, Jun 24, 2019 at 8:58 PM Jason
> > Gustafson
> > > <
> > > > > > > >>> > > > jason@confluent.io>
> > > > > > > >>> > > > > >> > > wrote:
> > > > > > > >>> > > > > >> > >
> > > > > > > >>> > > > > >> > > > Hi Boyang,
> > > > > > > >>> > > > > >> > > >
> > > > > > > >>> > > > > >> > > > Thanks for picking this up! Still reading
> > > > through
> > > > > > the
> > > > > > > >>> > updates,
> > > > > > > >>> > > > but
> > > > > > > >>> > > > > >> here
> > > > > > > >>> > > > > >> > > are
> > > > > > > >>> > > > > >> > > > a couple initial comments on the APIs:
> > > > > > > >>> > > > > >> > > >
> > > > > > > >>> > > > > >> > > > 1. The `TxnProducerIdentity` class is a
> bit
> > > > > > awkward. I
> > > > > > > >>> think
> > > > > > > >>> > > we
> > > > > > > >>> > > > are
> > > > > > > >>> > > > > >> > > trying
> > > > > > > >>> > > > > >> > > > to encapsulate state from the current
> group
> > > > > > > assignment.
> > > > > > > >>> > Maybe
> > > > > > > >>> > > > > >> something
> > > > > > > >>> > > > > >> > > > like `ConsumerAssignment` would be
> clearer?
> > If
> > > > we
> > > > > > make
> > > > > > > >>> the
> > > > > > > >>> > > usage
> > > > > > > >>> > > > > >> > > consistent
> > > > > > > >>> > > > > >> > > > across the consumer and producer, then we
> > can
> > > > > avoid
> > > > > > > >>> exposing
> > > > > > > >>> > > > > >> internal
> > > > > > > >>> > > > > >> > > state
> > > > > > > >>> > > > > >> > > > like the generationId.
> > > > > > > >>> > > > > >> > > >
> > > > > > > >>> > > > > >> > > > For example:
> > > > > > > >>> > > > > >> > > >
> > > > > > > >>> > > > > >> > > > // Public API
> > > > > > > >>> > > > > >> > > > interface ConsumerAssignment {
> > > > > > > >>> > > > > >> > > >   Set<TopicPartition> partittions();
> > > > > > > >>> > > > > >> > > > }
> > > > > > > >>> > > > > >> > > >
> > > > > > > >>> > > > > >> > > > // Not a public API
> > > > > > > >>> > > > > >> > > > class InternalConsumerAssignment
> implements
> > > > > > > >>> > > ConsumerAssignment {
> > > > > > > >>> > > > > >> > > >   Set<TopicPartition> partittions;
> > > > > > > >>> > > > > >> > > >   int generationId;
> > > > > > > >>> > > > > >> > > > }
> > > > > > > >>> > > > > >> > > >
> > > > > > > >>> > > > > >> > > > Then we can change the rebalance listener
> to
> > > > > > something
> > > > > > > >>> like
> > > > > > > >>> > > > this:
> > > > > > > >>> > > > > >> > > > onPartitionsAssigned(ConsumerAssignment
> > > > > assignment)
> > > > > > > >>> > > > > >> > > >
> > > > > > > >>> > > > > >> > > > And on the producer:
> > > > > > > >>> > > > > >> > > > void initTransactions(String groupId,
> > > > > > > ConsumerAssignment
> > > > > > > >>> > > > > >> assignment);
> > > > > > > >>> > > > > >> > > >
> > > > > > > >>> > > > > >> > > > 2. Another bit of awkwardness is the fact
> > that
> > > > we
> > > > > > have
> > > > > > > >>> to
> > > > > > > >>> > pass
> > > > > > > >>> > > > the
> > > > > > > >>> > > > > >> > > groupId
> > > > > > > >>> > > > > >> > > > through both initTransactions() and
> > > > > > > >>> > > sendOffsetsToTransaction().
> > > > > > > >>> > > > We
> > > > > > > >>> > > > > >> > could
> > > > > > > >>> > > > > >> > > > consider a config instead. Maybe something
> > > like
> > > > `
> > > > > > > >>> > > > > >> > transactional.group.id
> > > > > > > >>> > > > > >> > > `?
> > > > > > > >>> > > > > >> > > > Then we could simplify the producer APIs,
> > > > > > potentially
> > > > > > > >>> even
> > > > > > > >>> > > > > >> deprecating
> > > > > > > >>> > > > > >> > > the
> > > > > > > >>> > > > > >> > > > current sendOffsetsToTransaction. In fact,
> > for
> > > > > this
> > > > > > > new
> > > > > > > >>> > usage,
> > > > > > > >>> > > > the `
> > > > > > > >>> > > > > >> > > > transational.id` config is not needed. It
> > > would
> > > > > be
> > > > > > > >>> nice if
> > > > > > > >>> > we
> > > > > > > >>> > > > don't
> > > > > > > >>> > > > > >> > have
> > > > > > > >>> > > > > >> > > > to
> > > > > > > >>> > > > > >> > > > provide it.
> > > > > > > >>> > > > > >> > > >
> > > > > > > >>> > > > > >> > > > I like the idea of combining 1 and 2. We
> > could
> > > > > > > >>> definitely
> > > > > > > >>> > pass
> > > > > > > >>> > > > in a
> > > > > > > >>> > > > > >> > > group.id config
> > > > > > > >>> > > > > >> > > so that we could avoid exposing that
> > information
> > > > in
> > > > > a
> > > > > > > >>> public
> > > > > > > >>> > > API.
> > > > > > > >>> > > > The
> > > > > > > >>> > > > > >> > > question I have
> > > > > > > >>> > > > > >> > > is that whether we should name the interface
> > > > > > > >>> `GroupAssignment`
> > > > > > > >>> > > > > >> instead,
> > > > > > > >>> > > > > >> > so
> > > > > > > >>> > > > > >> > > that Connect later
> > > > > > > >>> > > > > >> > > could also extend on the same interface,
> just
> > to
> > > > > echo
> > > > > > > >>> > Guozhang's
> > > > > > > >>> > > > point
> > > > > > > >>> > > > > >> > > here, Also the base interface
> > > > > > > >>> > > > > >> > > is better to be defined empty for easy
> > > extension,
> > > > or
> > > > > > > >>> define an
> > > > > > > >>> > > > > >> abstract
> > > > > > > >>> > > > > >> > > type called `Resource` to be shareable
> > > > > > > >>> > > > > >> > > later IMHO.
> > > > > > > >>> > > > > >> > >
> > > > > > > >>> > > > > >> > >
> > > > > > > >>> > > > > >> > > > By the way, I'm a bit confused about
> > > discussion
> > > > > > above
> > > > > > > >>> about
> > > > > > > >>> > > > > >> colocating
> > > > > > > >>> > > > > >> > > the
> > > > > > > >>> > > > > >> > > > txn and group coordinators. That is not
> > > actually
> > > > > > > >>> necessary,
> > > > > > > >>> > is
> > > > > > > >>> > > > it?
> > > > > > > >>> > > > > >> > > >
> > > > > > > >>> > > > > >> > > > Yes, this is not a requirement for this
> KIP,
> > > > > because
> > > > > > > it
> > > > > > > >>> is
> > > > > > > >>> > > > > >> inherently
> > > > > > > >>> > > > > >> > > impossible to
> > > > > > > >>> > > > > >> > > achieve co-locating  topic partition of
> > > > transaction
> > > > > > log
> > > > > > > >>> and
> > > > > > > >>> > > > consumed
> > > > > > > >>> > > > > >> > offset
> > > > > > > >>> > > > > >> > > topics.
> > > > > > > >>> > > > > >> > >
> > > > > > > >>> > > > > >> > >
> > > > > > > >>> > > > > >> > > > Thanks,
> > > > > > > >>> > > > > >> > > > Jason
> > > > > > > >>> > > > > >> > > >
> > > > > > > >>> > > > > >> > > On Mon, Jun 24, 2019 at 10:07 AM Boyang
> Chen <
> > > > > > > >>> > > > > >> reluctanthero104@gmail.com
> > > > > > > >>> > > > > >> > >
> > > > > > > >>> > > > > >> > > > wrote:
> > > > > > > >>> > > > > >> > > >
> > > > > > > >>> > > > > >> > > > > Thank you Ismael for the suggestion. We
> > will
> > > > > > attempt
> > > > > > > >>> to
> > > > > > > >>> > > > address
> > > > > > > >>> > > > > >> it by
> > > > > > > >>> > > > > >> > > > > giving more details to rejected
> > alternative
> > > > > > section.
> > > > > > > >>> > > > > >> > > > >
> > > > > > > >>> > > > > >> > > > >
> > > > > > > >>> > > > > >> > > > > Thank you for the comment Guozhang!
> > Answers
> > > > are
> > > > > > > inline
> > > > > > > >>> > > below.
> > > > > > > >>> > > > > >> > > > >
> > > > > > > >>> > > > > >> > > > >
> > > > > > > >>> > > > > >> > > > >
> > > > > > > >>> > > > > >> > > > > On Sun, Jun 23, 2019 at 6:33 PM Guozhang
> > > Wang
> > > > <
> > > > > > > >>> > > > wangguoz@gmail.com
> > > > > > > >>> > > > > >> >
> > > > > > > >>> > > > > >> > > > wrote:
> > > > > > > >>> > > > > >> > > > >
> > > > > > > >>> > > > > >> > > > > > Hello Boyang,
> > > > > > > >>> > > > > >> > > > > >
> > > > > > > >>> > > > > >> > > > > > Thanks for the KIP, I have some
> comments
> > > > > below:
> > > > > > > >>> > > > > >> > > > > >
> > > > > > > >>> > > > > >> > > > > > 1. "Once transactions are complete,
> the
> > > call
> > > > > > will
> > > > > > > >>> > return."
> > > > > > > >>> > > > This
> > > > > > > >>> > > > > >> > seems
> > > > > > > >>> > > > > >> > > > > > different from the existing behavior,
> in
> > > > which
> > > > > > we
> > > > > > > >>> would
> > > > > > > >>> > > > return a
> > > > > > > >>> > > > > >> > > > > retriable
> > > > > > > >>> > > > > >> > > > > > CONCURRENT_TRANSACTIONS and let the
> > client
> > > > to
> > > > > > > >>> retry, is
> > > > > > > >>> > > this
> > > > > > > >>> > > > > >> > > > intentional?
> > > > > > > >>> > > > > >> > > > > >
> > > > > > > >>> > > > > >> > > > >
> > > > > > > >>> > > > > >> > > > > I don’t think it is intentional, and I
> > will
> > > > > defer
> > > > > > > this
> > > > > > > >>> > > > question to
> > > > > > > >>> > > > > >> > > Jason
> > > > > > > >>> > > > > >> > > > > when he got time to answer since from
> > what I
> > > > > > > >>> understood
> > > > > > > >>> > > retry
> > > > > > > >>> > > > and
> > > > > > > >>> > > > > >> on
> > > > > > > >>> > > > > >> > > hold
> > > > > > > >>> > > > > >> > > > > seem both valid approaches.
> > > > > > > >>> > > > > >> > > > >
> > > > > > > >>> > > > > >> > > > >
> > > > > > > >>> > > > > >> > > > > > 2. "an overload to
> onPartitionsAssigned
> > in
> > > > the
> > > > > > > >>> > consumer's
> > > > > > > >>> > > > > >> rebalance
> > > > > > > >>> > > > > >> > > > > > listener interface": as part of
> KIP-341
> > > > we've
> > > > > > > >>> already
> > > > > > > >>> > add
> > > > > > > >>> > > > this
> > > > > > > >>> > > > > >> > > > > information
> > > > > > > >>> > > > > >> > > > > > to the onAssignment callback. Would
> this
> > > be
> > > > > > > >>> sufficient?
> > > > > > > >>> > Or
> > > > > > > >>> > > > more
> > > > > > > >>> > > > > >> > > > generally
> > > > > > > >>> > > > > >> > > > > > speaking, which information have to be
> > > > passed
> > > > > > > >>> around in
> > > > > > > >>> > > > > >> rebalance
> > > > > > > >>> > > > > >> > > > > callback
> > > > > > > >>> > > > > >> > > > > > while others can be passed around in
> > > > > > > >>> PartitionAssignor
> > > > > > > >>> > > > > >> callback? In
> > > > > > > >>> > > > > >> > > > > Streams
> > > > > > > >>> > > > > >> > > > > > for example both callbacks are used
> but
> > > most
> > > > > > > >>> critical
> > > > > > > >>> > > > > >> information
> > > > > > > >>> > > > > >> > is
> > > > > > > >>> > > > > >> > > > > passed
> > > > > > > >>> > > > > >> > > > > > via onAssignment.
> > > > > > > >>> > > > > >> > > > > >
> > > > > > > >>> > > > > >> > > > >
> > > > > > > >>> > > > > >> > > > > We still need to extend
> > > > > ConsumerRebalanceListener
> > > > > > > >>> because
> > > > > > > >>> > > > it’s the
> > > > > > > >>> > > > > >> > > > > interface we could have public access
> to.
> > > The
> > > > > > > >>> > #onAssignment
> > > > > > > >>> > > > call
> > > > > > > >>> > > > > >> is
> > > > > > > >>> > > > > >> > > > defined
> > > > > > > >>> > > > > >> > > > > on PartitionAssignor level which is not
> > easy
> > > > to
> > > > > > work
> > > > > > > >>> with
> > > > > > > >>> > > > external
> > > > > > > >>> > > > > >> > > > > producers.
> > > > > > > >>> > > > > >> > > > >
> > > > > > > >>> > > > > >> > > > >
> > > > > > > >>> > > > > >> > > > > > 3. "We propose to use a separate
> record
> > > type
> > > > > in
> > > > > > > >>> order to
> > > > > > > >>> > > > store
> > > > > > > >>> > > > > >> the
> > > > > > > >>> > > > > >> > > > group
> > > > > > > >>> > > > > >> > > > > > assignment.": hmm, I thought with the
> > > third
> > > > > > typed
> > > > > > > >>> > > > > >> FindCoordinator,
> > > > > > > >>> > > > > >> > > the
> > > > > > > >>> > > > > >> > > > > same
> > > > > > > >>> > > > > >> > > > > > broker that act as the  consumer
> > > coordinator
> > > > > > would
> > > > > > > >>> > always
> > > > > > > >>> > > be
> > > > > > > >>> > > > > >> > selected
> > > > > > > >>> > > > > >> > > > as
> > > > > > > >>> > > > > >> > > > > > the txn coordinator, in which case it
> > can
> > > > > access
> > > > > > > its
> > > > > > > >>> > local
> > > > > > > >>> > > > cache
> > > > > > > >>> > > > > >> > > > > metadata /
> > > > > > > >>> > > > > >> > > > > > offset topic to get this information
> > > > already?
> > > > > We
> > > > > > > >>> just
> > > > > > > >>> > need
> > > > > > > >>> > > > to
> > > > > > > >>> > > > > >> think
> > > > > > > >>> > > > > >> > > > about
> > > > > > > >>> > > > > >> > > > > > how to make these two modules directly
> > > > > exchange
> > > > > > > >>> > > information
> > > > > > > >>> > > > > >> without
> > > > > > > >>> > > > > >> > > > > messing
> > > > > > > >>> > > > > >> > > > > > up the code hierarchy.
> > > > > > > >>> > > > > >> > > > > >
> > > > > > > >>> > > > > >> > > > >
> > > > > > > >>> > > > > >> > > > > These two coordinators will be on the
> same
> > > > > broker
> > > > > > > only
> > > > > > > >>> > when
> > > > > > > >>> > > > > >> number of
> > > > > > > >>> > > > > >> > > > > partitions for transaction state topic
> and
> > > > > > consumer
> > > > > > > >>> offset
> > > > > > > >>> > > > topic
> > > > > > > >>> > > > > >> are
> > > > > > > >>> > > > > >> > > the
> > > > > > > >>> > > > > >> > > > > same. This normally holds true, but I'm
> > > afraid
> > > > > > > >>> > > > > >> > > > > we couldn't make this assumption?
> > > > > > > >>> > > > > >> > > > >
> > > > > > > >>> > > > > >> > > > > 4. The config of
> > > > > > "CONSUMER_GROUP_AWARE_TRANSACTION":
> > > > > > > >>> it
> > > > > > > >>> > > seems
> > > > > > > >>> > > > the
> > > > > > > >>> > > > > >> > goal
> > > > > > > >>> > > > > >> > > of
> > > > > > > >>> > > > > >> > > > > > this config is just to avoid
> > old-versioned
> > > > > > broker
> > > > > > > >>> to not
> > > > > > > >>> > > be
> > > > > > > >>> > > > > >> able to
> > > > > > > >>> > > > > >> > > > > > recognize newer versioned client. I
> > think
> > > if
> > > > > we
> > > > > > > can
> > > > > > > >>> do
> > > > > > > >>> > > > something
> > > > > > > >>> > > > > >> > else
> > > > > > > >>> > > > > >> > > > to
> > > > > > > >>> > > > > >> > > > > > avoid this config though, for example
> we
> > > can
> > > > > use
> > > > > > > the
> > > > > > > >>> > > > embedded
> > > > > > > >>> > > > > >> > > > AdminClient
> > > > > > > >>> > > > > >> > > > > > to send the APIVersion request upon
> > > starting
> > > > > up,
> > > > > > > and
> > > > > > > >>> > based
> > > > > > > >>> > > > on
> > > > > > > >>> > > > > >> the
> > > > > > > >>> > > > > >> > > > > returned
> > > > > > > >>> > > > > >> > > > > > value decides whether to go to the old
> > > code
> > > > > path
> > > > > > > or
> > > > > > > >>> the
> > > > > > > >>> > > new
> > > > > > > >>> > > > > >> > behavior.
> > > > > > > >>> > > > > >> > > > > > Admittedly asking a random broker
> about
> > > > > > APIVersion
> > > > > > > >>> does
> > > > > > > >>> > > not
> > > > > > > >>> > > > > >> > guarantee
> > > > > > > >>> > > > > >> > > > the
> > > > > > > >>> > > > > >> > > > > > whole cluster's versions, but what we
> > can
> > > do
> > > > > is
> > > > > > to
> > > > > > > >>> first
> > > > > > > >>> > > 1)
> > > > > > > >>> > > > find
> > > > > > > >>> > > > > >> > the
> > > > > > > >>> > > > > >> > > > > > coordinator (and if the random broker
> > does
> > > > not
> > > > > > > even
> > > > > > > >>> > > > recognize
> > > > > > > >>> > > > > >> the
> > > > > > > >>> > > > > >> > new
> > > > > > > >>> > > > > >> > > > > > discover type, fall back to old path
> > > > > directly),
> > > > > > > and
> > > > > > > >>> then
> > > > > > > >>> > > 2)
> > > > > > > >>> > > > ask
> > > > > > > >>> > > > > >> the
> > > > > > > >>> > > > > >> > > > > > discovered coordinator about its
> > supported
> > > > > > > >>> APIVersion.
> > > > > > > >>> > > > > >> > > > > >
> > > > > > > >>> > > > > >> > > > >
> > > > > > > >>> > > > > >> > > > > The caveat here is that we have to make
> > sure
> > > > > both
> > > > > > > the
> > > > > > > >>> > group
> > > > > > > >>> > > > > >> > coordinator
> > > > > > > >>> > > > > >> > > > and
> > > > > > > >>> > > > > >> > > > > transaction coordinator are on the
> latest
> > > > > version
> > > > > > > >>> during
> > > > > > > >>> > > init
> > > > > > > >>> > > > > >> stage.
> > > > > > > >>> > > > > >> > > This
> > > > > > > >>> > > > > >> > > > > is potentially doable as we only need a
> > > > consumer
> > > > > > > >>> group.id
> > > > > > > >>> > > > > >> > > > > to check that. In the meantime, a
> > hard-coded
> > > > > > config
> > > > > > > is
> > > > > > > >>> > > still a
> > > > > > > >>> > > > > >> > > favorable
> > > > > > > >>> > > > > >> > > > > backup in case the server has
> downgraded,
> > so
> > > > you
> > > > > > > will
> > > > > > > >>> want
> > > > > > > >>> > > to
> > > > > > > >>> > > > use
> > > > > > > >>> > > > > >> a
> > > > > > > >>> > > > > >> > new
> > > > > > > >>> > > > > >> > > > > version client without `consumer group`
> > > > > > > transactional
> > > > > > > >>> > > support.
> > > > > > > >>> > > > > >> > > > >
> > > > > > > >>> > > > > >> > > > > 5. This is a meta question: have you
> > > > considered
> > > > > > how
> > > > > > > >>> this
> > > > > > > >>> > can
> > > > > > > >>> > > > be
> > > > > > > >>> > > > > >> > applied
> > > > > > > >>> > > > > >> > > > to
> > > > > > > >>> > > > > >> > > > > > Kafka Connect as well? For example,
> for
> > > > source
> > > > > > > >>> > connectors,
> > > > > > > >>> > > > the
> > > > > > > >>> > > > > >> > > > assignment
> > > > > > > >>> > > > > >> > > > > > is not by "partitions", but by some
> > other
> > > > sort
> > > > > > of
> > > > > > > >>> > > > "resources"
> > > > > > > >>> > > > > >> based
> > > > > > > >>> > > > > >> > > on
> > > > > > > >>> > > > > >> > > > > the
> > > > > > > >>> > > > > >> > > > > > source systems, how KIP-447 would
> affect
> > > > Kafka
> > > > > > > >>> > Connectors
> > > > > > > >>> > > > that
> > > > > > > >>> > > > > >> > > > > implemented
> > > > > > > >>> > > > > >> > > > > > EOS as well?
> > > > > > > >>> > > > > >> > > > > >
> > > > > > > >>> > > > > >> > > > >
> > > > > > > >>> > > > > >> > > > > No, it's not currently included in the
> > > scope.
> > > > > > Could
> > > > > > > >>> you
> > > > > > > >>> > > point
> > > > > > > >>> > > > me
> > > > > > > >>> > > > > >> to a
> > > > > > > >>> > > > > >> > > > > sample source connector who uses EOS?
> > Could
> > > > > always
> > > > > > > >>> > > piggy-back
> > > > > > > >>> > > > into
> > > > > > > >>> > > > > >> > the
> > > > > > > >>> > > > > >> > > > > TxnProducerIdentity struct with more
> > > > information
> > > > > > > such
> > > > > > > >>> as
> > > > > > > >>> > > > tasks. If
> > > > > > > >>> > > > > >> > > > > this is something to support in near
> term,
> > > an
> > > > > > > abstract
> > > > > > > >>> > type
> > > > > > > >>> > > > called
> > > > > > > >>> > > > > >> > > > > "Resource" could be provided and let
> topic
> > > > > > partition
> > > > > > > >>> and
> > > > > > > >>> > > > connect
> > > > > > > >>> > > > > >> task
> > > > > > > >>> > > > > >> > > > > implement it.
> > > > > > > >>> > > > > >> > > > >
> > > > > > > >>> > > > > >> > > > >
> > > > > > > >>> > > > > >> > > > > >
> > > > > > > >>> > > > > >> > > > > > Guozhang
> > > > > > > >>> > > > > >> > > > > >
> > > > > > > >>> > > > > >> > > > > >
> > > > > > > >>> > > > > >> > > > > > On Sat, Jun 22, 2019 at 8:40 PM Ismael
> > > Juma
> > > > <
> > > > > > > >>> > > > ismael@juma.me.uk>
> > > > > > > >>> > > > > >> > > wrote:
> > > > > > > >>> > > > > >> > > > > >
> > > > > > > >>> > > > > >> > > > > > > Hi Boyang,
> > > > > > > >>> > > > > >> > > > > > >
> > > > > > > >>> > > > > >> > > > > > > Thanks for the KIP. It's good that
> we
> > > > > listed a
> > > > > > > >>> number
> > > > > > > >>> > of
> > > > > > > >>> > > > > >> rejected
> > > > > > > >>> > > > > >> > > > > > > alternatives. It would be helpful to
> > > have
> > > > an
> > > > > > > >>> > explanation
> > > > > > > >>> > > > of
> > > > > > > >>> > > > > >> why
> > > > > > > >>> > > > > >> > > they
> > > > > > > >>> > > > > >> > > > > were
> > > > > > > >>> > > > > >> > > > > > > rejected.
> > > > > > > >>> > > > > >> > > > > > >
> > > > > > > >>> > > > > >> > > > > > > Ismael
> > > > > > > >>> > > > > >> > > > > > >
> > > > > > > >>> > > > > >> > > > > > > On Sat, Jun 22, 2019 at 8:31 PM
> Boyang
> > > > Chen
> > > > > <
> > > > > > > >>> > > > > >> bchen11@outlook.com
> > > > > > > >>> > > > > >> > >
> > > > > > > >>> > > > > >> > > > > wrote:
> > > > > > > >>> > > > > >> > > > > > >
> > > > > > > >>> > > > > >> > > > > > > > Hey all,
> > > > > > > >>> > > > > >> > > > > > > >
> > > > > > > >>> > > > > >> > > > > > > > I would like to start a discussion
> > for
> > > > > > > KIP-447:
> > > > > > > >>> > > > > >> > > > > > > >
> > > > > > > >>> > > > > >> > > > > > > >
> > > > > > > >>> > > > > >> > > > > > >
> > > > > > > >>> > > > > >> > > > > >
> > > > > > > >>> > > > > >> > > > >
> > > > > > > >>> > > > > >> > > >
> > > > > > > >>> > > > > >> > >
> > > > > > > >>> > > > > >> >
> > > > > > > >>> > > > > >>
> > > > > > > >>> > > >
> > > > > > > >>> > >
> > > > > > > >>> >
> > > > > > > >>>
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-447%3A+Producer+scalability+for+exactly+once+semantics
> > > > > > > >>> > > > > >> > > > > > > >
> > > > > > > >>> > > > > >> > > > > > > > this is a work originated by Jason
> > > > > Gustafson
> > > > > > > >>> and we
> > > > > > > >>> > > > would
> > > > > > > >>> > > > > >> like
> > > > > > > >>> > > > > >> > to
> > > > > > > >>> > > > > >> > > > > > proceed
> > > > > > > >>> > > > > >> > > > > > > > into discussion stage.
> > > > > > > >>> > > > > >> > > > > > > >
> > > > > > > >>> > > > > >> > > > > > > > Let me know your thoughts, thanks!
> > > > > > > >>> > > > > >> > > > > > > >
> > > > > > > >>> > > > > >> > > > > > > > Boyang
> > > > > > > >>> > > > > >> > > > > > > >
> > > > > > > >>> > > > > >> > > > > > >
> > > > > > > >>> > > > > >> > > > > >
> > > > > > > >>> > > > > >> > > > > >
> > > > > > > >>> > > > > >> > > > > > --
> > > > > > > >>> > > > > >> > > > > > -- Guozhang
> > > > > > > >>> > > > > >> > > > > >
> > > > > > > >>> > > > > >> > > > >
> > > > > > > >>> > > > > >> > > >
> > > > > > > >>> > > > > >> > >
> > > > > > > >>> > > > > >> >
> > > > > > > >>> > > > > >> >
> > > > > > > >>> > > > > >> > --
> > > > > > > >>> > > > > >> > -- Guozhang
> > > > > > > >>> > > > > >> >
> > > > > > > >>> > > > > >>
> > > > > > > >>> > > > > >
> > > > > > > >>> > > > >
> > > > > > > >>> > > >
> > > > > > > >>> > >
> > > > > > > >>> >
> > > > > > > >>>
> > > > > > > >>>
> > > > > > > >>> --
> > > > > > > >>> -- Guozhang
> > > > > > > >>>
> > > > > > > >>
> > > > > > >
> > > > > >
> > > > >
> > > >
> > > >
> > > > --
> > > > -- Guozhang
> > > >
> > >
> >
> >
> > --
> > -- Guozhang
> >
>


-- 
-- Guozhang

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message