kafka-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Boyang Chen <reluctanthero...@gmail.com>
Subject Re: [DISCUSS] KIP-447: Producer scalability for exactly once semantics
Date Tue, 02 Jul 2019 22:21:36 GMT
Thank you Guozhang for the questions, inline answers are below.

On Tue, Jul 2, 2019 at 3:14 PM Boyang Chen <reluctanthero104@gmail.com>
wrote:

> Hey all,
>
> I have done a fundamental polish of KIP-447
> <https://cwiki.apache.org/confluence/display/KAFKA/KIP-447%3A+Producer+scalability+for+exactly+once+semantics>
and
> written a design doc
> <https://docs.google.com/document/d/1LhzHGeX7_Lay4xvrEXxfciuDWATjpUXQhrEIkph9qRE/edit#>
depicting
> internal changes. We stripped off many implementation details from the KIP,
> and simplified the public changes by a lot. For reviewers, it is highly
> recommended to fully understand EOS design in KIP-98 and read its
> corresponding design doc
> <https://docs.google.com/document/d/11Jqy_GjUGtdXJK94XGsEIK7CP1SnQGdp2eF0wSw9ra8/edit>
if
> you haven't done so already.
>
> Let me know if you found anything confusing around the KIP or the design.
> Would be happy to discuss in depth.
>
> Best,
> Boyang
>
>
>
>
>
> On Wed, Jun 26, 2019 at 11:00 AM Guozhang Wang <wangguoz@gmail.com> wrote:
>
>> 2. The reason we did not expose generation.id from KafkaConsumer public
>> APIs directly is to abstract this notion from users (since it is an
>> implementation detail of the rebalance protocol itself, e.g. if user calls
>> consumer.assign() they do not need to invoke ConsumerCoordinator and no
>> need to be aware of generation.id at all).
>>
>> On the other hand, with the current proposal the txn.coordiantor did not
>> know about the latest generation from the source-of-truth
>> group.coordinator; instead, it will only bump up the generation from the
>> producer's InitProducerIdRequest only.
>>
>> The key here is that GroupCoordinator, when handling
>> `InitProducerIdRequest
>>
> In the new design, we just pass the entire consumer instance into the
producer through
#initTransaction, so no public API will be created.

> 3. I agree that if we rely on the group coordinator to block on returning
>> offset-fetch-response if read-committed is enabled, then we do not need to
>> store partition assignment on txn coordinator and therefore it's better to
>> still decouple them. For that case we still need to update the KIP wiki
>> page that includes:
>>
>> 3.a. Augment OffsetFetchRequest with the ISOLATION_LEVEL as well.
>> 3.b. Add new error code in OffsetFetchResponse to let client backoff and
>> retry if there are pending txns including the interested partitions.
>> 3.c. Also in the worst case we would let the client be blocked for the
>> txn.timeout period, and for that rationale we may need to consider
>> reducing
>> our default txn.timeout value as well.
>>
>> Addressed 3.b and 3.c, will do 3.a.

> 4. According to Colin it seems we do not need to create another KIP and we
>> can just complete it as part of KIP-117 / KAFKA-5214; and we need to do
>> some cleanup to have BrokerApiVersion exposed from AdminClient (@Colin
>> please let use know if you have any concerns exposing it).
>>
> I think we no longer need to rely on api version for initialization, since
we will be using the upgrade.from config anyway.

>
>> Guozhang
>>
>>
>> On Tue, Jun 25, 2019 at 6:43 PM Jason Gustafson <jason@confluent.io>
>> wrote:
>>
>> > For reference, we have BrokerApiVersionCommand already as a public
>> > interface. We have a bit of tech debt at the moment because it uses a
>> > custom AdminClient. It would be nice to clean that up. In general, I
>> think
>> > it is reasonable to expose from AdminClient. It can be used by
>> management
>> > tools to inspect running Kafka versions for example.
>> >
>> > -Jason
>> >
>> > On Tue, Jun 25, 2019 at 4:37 PM Boyang Chen <reluctanthero104@gmail.com
>> >
>> > wrote:
>> >
>> > > Thank you for the context Colin. The groupId was indeed a copy-paste
>> > error.
>> > > Our use case here for 447 is (Quoted from Guozhang):
>> > > '''
>> > > I think if we can do something else to
>> > > avoid this config though, for example we can use the embedded
>> AdminClient
>> > > to send the APIVersion request upon starting up, and based on the
>> > returned
>> > > value decides whether to go to the old code path or the new behavior.
>> > > '''
>> > > The benefit we get is to avoid adding a new configuration to make a
>> > > decision simply base on broker version. If you have concerns with
>> > exposing
>> > > ApiVersion for client, we could
>> > > try to think of alternative solutions too.
>> > >
>> > > Boyang
>> > >
>> > >
>> > >
>> > > On Tue, Jun 25, 2019 at 4:20 PM Colin McCabe <cmccabe@apache.org>
>> wrote:
>> > >
>> > > > kafka.api.ApiVersion is an internal class, not suitable to exposing
>> > > > through AdminClient.  That class is not even accessible without
>> having
>> > > the
>> > > > broker jars on your CLASSPATH.
>> > > >
>> > > > Another question is, what is the groupId parameter doing in the
>> call?
>> > > The
>> > > > API versions are the same no matter what consumer group we use,
>> right?
>> > > > Perhaps this was a copy and paste error?
>> > > >
>> > > > This is not the first time we have discussed having a method in
>> > > > AdminClient to retrieve API version information.  In fact, the
>> original
>> > > KIP
>> > > > which created KafkaAdminClient specified an API for fetching version
>> > > > information.  It was called apiVersions and it is still there on the
>> > > wiki.
>> > > > See
>> > > >
>> > >
>> >
>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-117%3A+Add+a+public+AdminClient+API+for+Kafka+admin+operations
>> > > >
>> > > > However, this API wasn't ready in time for 0.11.0 so we shipped
>> without
>> > > > it.  There was a JIRA to implement it for later versions,
>> > > > https://issues.apache.org/jira/browse/KAFKA-5214 , as well as a PR,
>> > > > https://github.com/apache/kafka/pull/3012 .  However, we started to
>> > > > rethink whether this AdminClient function was even necessary.  Most
>> of
>> > > the
>> > > > use-cases we could think of seemed like horrible hacks.  So it has
>> > never
>> > > > really been implemented (yet?).
>> > > >
>> > > > best,
>> > > > Colin
>> > > >
>> > > >
>> > > > On Tue, Jun 25, 2019, at 15:46, Boyang Chen wrote:
>> > > > > Actually, after a second thought, I think it actually makes sense
>> to
>> > > > > support auto upgrade through admin client to help use get api
>> version
>> > > > > from
>> > > > > broker.
>> > > > > A draft KIP is here:
>> > > > >
>> > > >
>> > >
>> >
>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-483%3A++Add+Broker+Version+API+in+Admin+Client
>> > > > >
>> > > > > Boyang
>> > > > >
>> > > > > On Tue, Jun 25, 2019 at 2:57 PM Boyang Chen <
>> > > reluctanthero104@gmail.com>
>> > > > > wrote:
>> > > > >
>> > > > > > Thank you Guozhang, some of my understandings are inline
below.
>> > > > > >
>> > > > > > On Tue, Jun 25, 2019 at 11:05 AM Jason Gustafson <
>> > jason@confluent.io
>> > > >
>> > > > > > wrote:
>> > > > > >
>> > > > > >> >
>> > > > > >> > I think co-locating does have some merits here,
i.e. letting
>> the
>> > > > > >> > ConsumerCoordinator which has the source-of-truth
of
>> assignment
>> > to
>> > > > act
>> > > > > >> as
>> > > > > >> > the TxnCoordinator as well; but I agree there's
also some
>> cons
>> > of
>> > > > > >> coupling
>> > > > > >> > them together. I'm still a bit inclining towards
colocation
>> but
>> > if
>> > > > there
>> > > > > >> > are good rationales not to do so I can be convinced
as well.
>> > > > > >>
>> > > > > >>
>> > > > > >> The good rationale is that we have no mechanism to colocate
>> > > > partitions ;).
>> > > > > >> Are you suggesting we store the group and transaction
state in
>> the
>> > > > same
>> > > > > >> log? Can you be more concrete about the benefit?
>> > > > > >>
>> > > > > >> -Jason
>> > > > > >>
>> > > > > >> On Tue, Jun 25, 2019 at 10:51 AM Guozhang Wang <
>> > wangguoz@gmail.com>
>> > > > > >> wrote:
>> > > > > >>
>> > > > > >> > Hi Boyang,
>> > > > > >> >
>> > > > > >> > 1. One advantage of retry against on-hold is that
it will not
>> > > > tie-up a
>> > > > > >> > handler thread (of course the latter could do the
same but
>> that
>> > > > involves
>> > > > > >> > using a purgatory which is more complicated), and
also it is
>> > less
>> > > > > >> likely to
>> > > > > >> > violate request timeout. So I think there are some
>> rationales to
>> > > > prefer
>> > > > > >> > retries.
>> > > > > >> >
>> > > > > >>
>> > > > > >  That sounds fair to me, also we are avoiding usage of another
>> > > > purgatory
>> > > > > > instance. Usually for one back-off
>> > > > > > we are only delaying 50ms during startup which is trivial
cost.
>> > This
>> > > > > > behavior shouldn't be changed.
>> > > > > >
>> > > > > > > 2. Regarding "ConsumerRebalanceListener": both
>> > > > ConsumerRebalanceListener
>> > > > > >> > and PartitionAssignors are user-customizable modules,
and
>> only
>> > > > > >> difference
>> > > > > >> > is that the former is specified via code and the
latter is
>> > > > specified via
>> > > > > >> > config.
>> > > > > >> >
>> > > > > >> > Regarding Jason's proposal of ConsumerAssignment,
one thing
>> to
>> > > note
>> > > > > >> though
>> > > > > >> > with KIP-429 the onPartitionAssigned may not be
called if the
>> > > > assignment
>> > > > > >> > does not change, whereas onAssignment would always
be called
>> at
>> > > the
>> > > > end
>> > > > > >> of
>> > > > > >> > sync-group response. My proposed semantics is that
>> > > > > >> > `RebalanceListener#onPartitionsXXX` are used for
>> notifications
>> > to
>> > > > user,
>> > > > > >> and
>> > > > > >> > hence if there's no changes these will not be called,
whereas
>> > > > > >> > `PartitionAssignor` is used for assignor logic,
whose
>> callback
>> > > would
>> > > > > >> always
>> > > > > >> > be called no matter if the partitions have changed
or not.
>> > > > > >>
>> > > > > >> I think a third option is to gracefully expose generation
id as
>> > part
>> > > > of
>> > > > > > consumer API, so that we don't need to
>> > > > > > bother overloading various callbacks. Of course, this builds
>> upon
>> > the
>> > > > > > assumption that topic partitions
>> > > > > > will not be included in new initTransaction API.
>> > > > > >
>> > > > > > > 3. I feel it is a bit awkward to let the TxnCoordinator
>> keeping
>> > > > partition
>> > > > > >> > assignments since it is sort of taking over the
job of the
>> > > > > >> > ConsumerCoordinator, and may likely cause a split-brain
>> problem
>> > as
>> > > > two
>> > > > > >> > coordinators keep a copy of this assignment which
may be
>> > > different.
>> > > > > >> >
>> > > > > >> > I think co-locating does have some merits here,
i.e. letting
>> the
>> > > > > >> > ConsumerCoordinator which has the source-of-truth
of
>> assignment
>> > to
>> > > > act
>> > > > > >> as
>> > > > > >> > the TxnCoordinator as well; but I agree there's
also some
>> cons
>> > of
>> > > > > >> coupling
>> > > > > >> > them together. I'm still a bit inclining towards
colocation
>> but
>> > if
>> > > > there
>> > > > > >> > are good rationales not to do so I can be convinced
as well.
>> > > > > >> >
>> > > > > >>
>> > > > > > The purpose of co-location is to let txn coordinator see
the
>> group
>> > > > > > assignment. This priority is weakened
>> > > > > > when we already have defense on the consumer offset fetch,
so I
>> > guess
>> > > > it's
>> > > > > > not super important anymore.
>> > > > > >
>> > > > > >
>> > > > > >> > 4. I guess I'm preferring the philosophy of "only
add
>> configs if
>> > > > > >> there's no
>> > > > > >> > other ways", since more and more configs would
make it less
>> and
>> > > less
>> > > > > >> > intuitive out of the box to use.
>> > > > > >> >
>> > > > > >> > I think it's a valid point that checks upon starting
up does
>> not
>> > > > cope
>> > > > > >> with
>> > > > > >> > brokers downgrading but even with a config, but
it is still
>> hard
>> > > for
>> > > > > >> users
>> > > > > >> > to determine when they can be ensured the broker
would never
>> > > > downgrade
>> > > > > >> > anymore and hence can safely switch the config.
So my
>> feeling is
>> > > > that
>> > > > > >> this
>> > > > > >> > config would not be helping too much still. If
we want to be
>> at
>> > > the
>> > > > > >> safer
>> > > > > >> > side, then I'd suggest we modify the Coordinator
->
>> > NetworkClient
>> > > > > >> hierarchy
>> > > > > >> > to allow the NetworkClient being able to pass the
APIVersion
>> > > > metadata to
>> > > > > >> > Coordinator, so that Coordinator can rely on that
logic to
>> > change
>> > > > its
>> > > > > >> > behavior dynamically.
>> > > > > >>
>> > > > > > The stream thread init could not be supported by a client
>> > coordinator
>> > > > > > behavior change on the fly,
>> > > > > > we are only losing possibilities after we initialized. (main
>> thread
>> > > > gets
>> > > > > > exit and no thread has global picture anymore)
>> > > > > > If we do want to support auto version detection, admin client
>> > request
>> > > > in
>> > > > > > this sense shall be easier.
>> > > > > >
>> > > > > >
>> > > > > >> >
>> > > > > >> > 5. I do not have a concrete idea about how the
impact on
>> Connect
>> > > > would
>> > > > > >> > make, maybe Randall or Konstantine can help here?
>> > > > > >> >
>> > > > > >>
>> > > > > > Sounds good, let's see their thoughts.
>> > > > > >
>> > > > > >
>> > > > > >> > Guozhang
>> > > > > >> >
>> > > > > >> > On Mon, Jun 24, 2019 at 10:26 PM Boyang Chen <
>> > > > > >> reluctanthero104@gmail.com>
>> > > > > >> > wrote:
>> > > > > >> >
>> > > > > >> > > Hey Jason,
>> > > > > >> > >
>> > > > > >> > > thank you for the proposal here. Some of my
thoughts below.
>> > > > > >> > >
>> > > > > >> > > On Mon, Jun 24, 2019 at 8:58 PM Jason Gustafson
<
>> > > > jason@confluent.io>
>> > > > > >> > > wrote:
>> > > > > >> > >
>> > > > > >> > > > Hi Boyang,
>> > > > > >> > > >
>> > > > > >> > > > Thanks for picking this up! Still reading
through the
>> > updates,
>> > > > but
>> > > > > >> here
>> > > > > >> > > are
>> > > > > >> > > > a couple initial comments on the APIs:
>> > > > > >> > > >
>> > > > > >> > > > 1. The `TxnProducerIdentity` class is
a bit awkward. I
>> think
>> > > we
>> > > > are
>> > > > > >> > > trying
>> > > > > >> > > > to encapsulate state from the current
group assignment.
>> > Maybe
>> > > > > >> something
>> > > > > >> > > > like `ConsumerAssignment` would be clearer?
If we make
>> the
>> > > usage
>> > > > > >> > > consistent
>> > > > > >> > > > across the consumer and producer, then
we can avoid
>> exposing
>> > > > > >> internal
>> > > > > >> > > state
>> > > > > >> > > > like the generationId.
>> > > > > >> > > >
>> > > > > >> > > > For example:
>> > > > > >> > > >
>> > > > > >> > > > // Public API
>> > > > > >> > > > interface ConsumerAssignment {
>> > > > > >> > > >   Set<TopicPartition> partittions();
>> > > > > >> > > > }
>> > > > > >> > > >
>> > > > > >> > > > // Not a public API
>> > > > > >> > > > class InternalConsumerAssignment implements
>> > > ConsumerAssignment {
>> > > > > >> > > >   Set<TopicPartition> partittions;
>> > > > > >> > > >   int generationId;
>> > > > > >> > > > }
>> > > > > >> > > >
>> > > > > >> > > > Then we can change the rebalance listener
to something
>> like
>> > > > this:
>> > > > > >> > > > onPartitionsAssigned(ConsumerAssignment
assignment)
>> > > > > >> > > >
>> > > > > >> > > > And on the producer:
>> > > > > >> > > > void initTransactions(String groupId,
ConsumerAssignment
>> > > > > >> assignment);
>> > > > > >> > > >
>> > > > > >> > > > 2. Another bit of awkwardness is the
fact that we have to
>> > pass
>> > > > the
>> > > > > >> > > groupId
>> > > > > >> > > > through both initTransactions() and
>> > > sendOffsetsToTransaction().
>> > > > We
>> > > > > >> > could
>> > > > > >> > > > consider a config instead. Maybe something
like `
>> > > > > >> > transactional.group.id
>> > > > > >> > > `?
>> > > > > >> > > > Then we could simplify the producer APIs,
potentially
>> even
>> > > > > >> deprecating
>> > > > > >> > > the
>> > > > > >> > > > current sendOffsetsToTransaction. In
fact, for this new
>> > usage,
>> > > > the `
>> > > > > >> > > > transational.id` config is not needed.
It would be nice
>> if
>> > we
>> > > > don't
>> > > > > >> > have
>> > > > > >> > > > to
>> > > > > >> > > > provide it.
>> > > > > >> > > >
>> > > > > >> > > > I like the idea of combining 1 and 2.
We could definitely
>> > pass
>> > > > in a
>> > > > > >> > > group.id config
>> > > > > >> > > so that we could avoid exposing that information
in a
>> public
>> > > API.
>> > > > The
>> > > > > >> > > question I have
>> > > > > >> > > is that whether we should name the interface
>> `GroupAssignment`
>> > > > > >> instead,
>> > > > > >> > so
>> > > > > >> > > that Connect later
>> > > > > >> > > could also extend on the same interface, just
to echo
>> > Guozhang's
>> > > > point
>> > > > > >> > > here, Also the base interface
>> > > > > >> > > is better to be defined empty for easy extension,
or
>> define an
>> > > > > >> abstract
>> > > > > >> > > type called `Resource` to be shareable
>> > > > > >> > > later IMHO.
>> > > > > >> > >
>> > > > > >> > >
>> > > > > >> > > > By the way, I'm a bit confused about
discussion above
>> about
>> > > > > >> colocating
>> > > > > >> > > the
>> > > > > >> > > > txn and group coordinators. That is not
actually
>> necessary,
>> > is
>> > > > it?
>> > > > > >> > > >
>> > > > > >> > > > Yes, this is not a requirement for this
KIP, because it
>> is
>> > > > > >> inherently
>> > > > > >> > > impossible to
>> > > > > >> > > achieve co-locating  topic partition of transaction
log and
>> > > > consumed
>> > > > > >> > offset
>> > > > > >> > > topics.
>> > > > > >> > >
>> > > > > >> > >
>> > > > > >> > > > Thanks,
>> > > > > >> > > > Jason
>> > > > > >> > > >
>> > > > > >> > > On Mon, Jun 24, 2019 at 10:07 AM Boyang Chen
<
>> > > > > >> reluctanthero104@gmail.com
>> > > > > >> > >
>> > > > > >> > > > wrote:
>> > > > > >> > > >
>> > > > > >> > > > > Thank you Ismael for the suggestion.
We will attempt to
>> > > > address
>> > > > > >> it by
>> > > > > >> > > > > giving more details to rejected
alternative section.
>> > > > > >> > > > >
>> > > > > >> > > > >
>> > > > > >> > > > > Thank you for the comment Guozhang!
Answers are inline
>> > > below.
>> > > > > >> > > > >
>> > > > > >> > > > >
>> > > > > >> > > > >
>> > > > > >> > > > > On Sun, Jun 23, 2019 at 6:33 PM
Guozhang Wang <
>> > > > wangguoz@gmail.com
>> > > > > >> >
>> > > > > >> > > > wrote:
>> > > > > >> > > > >
>> > > > > >> > > > > > Hello Boyang,
>> > > > > >> > > > > >
>> > > > > >> > > > > > Thanks for the KIP, I have
some comments below:
>> > > > > >> > > > > >
>> > > > > >> > > > > > 1. "Once transactions are complete,
the call will
>> > return."
>> > > > This
>> > > > > >> > seems
>> > > > > >> > > > > > different from the existing
behavior, in which we
>> would
>> > > > return a
>> > > > > >> > > > > retriable
>> > > > > >> > > > > > CONCURRENT_TRANSACTIONS and
let the client to retry,
>> is
>> > > this
>> > > > > >> > > > intentional?
>> > > > > >> > > > > >
>> > > > > >> > > > >
>> > > > > >> > > > > I don’t think it is intentional,
and I will defer this
>> > > > question to
>> > > > > >> > > Jason
>> > > > > >> > > > > when he got time to answer since
from what I understood
>> > > retry
>> > > > and
>> > > > > >> on
>> > > > > >> > > hold
>> > > > > >> > > > > seem both valid approaches.
>> > > > > >> > > > >
>> > > > > >> > > > >
>> > > > > >> > > > > > 2. "an overload to onPartitionsAssigned
in the
>> > consumer's
>> > > > > >> rebalance
>> > > > > >> > > > > > listener interface": as part
of KIP-341 we've already
>> > add
>> > > > this
>> > > > > >> > > > > information
>> > > > > >> > > > > > to the onAssignment callback.
Would this be
>> sufficient?
>> > Or
>> > > > more
>> > > > > >> > > > generally
>> > > > > >> > > > > > speaking, which information
have to be passed around
>> in
>> > > > > >> rebalance
>> > > > > >> > > > > callback
>> > > > > >> > > > > > while others can be passed
around in
>> PartitionAssignor
>> > > > > >> callback? In
>> > > > > >> > > > > Streams
>> > > > > >> > > > > > for example both callbacks
are used but most critical
>> > > > > >> information
>> > > > > >> > is
>> > > > > >> > > > > passed
>> > > > > >> > > > > > via onAssignment.
>> > > > > >> > > > > >
>> > > > > >> > > > >
>> > > > > >> > > > > We still need to extend ConsumerRebalanceListener
>> because
>> > > > it’s the
>> > > > > >> > > > > interface we could have public access
to. The
>> > #onAssignment
>> > > > call
>> > > > > >> is
>> > > > > >> > > > defined
>> > > > > >> > > > > on PartitionAssignor level which
is not easy to work
>> with
>> > > > external
>> > > > > >> > > > > producers.
>> > > > > >> > > > >
>> > > > > >> > > > >
>> > > > > >> > > > > > 3. "We propose to use a separate
record type in
>> order to
>> > > > store
>> > > > > >> the
>> > > > > >> > > > group
>> > > > > >> > > > > > assignment.": hmm, I thought
with the third typed
>> > > > > >> FindCoordinator,
>> > > > > >> > > the
>> > > > > >> > > > > same
>> > > > > >> > > > > > broker that act as the  consumer
coordinator would
>> > always
>> > > be
>> > > > > >> > selected
>> > > > > >> > > > as
>> > > > > >> > > > > > the txn coordinator, in which
case it can access its
>> > local
>> > > > cache
>> > > > > >> > > > > metadata /
>> > > > > >> > > > > > offset topic to get this information
already? We just
>> > need
>> > > > to
>> > > > > >> think
>> > > > > >> > > > about
>> > > > > >> > > > > > how to make these two modules
directly exchange
>> > > information
>> > > > > >> without
>> > > > > >> > > > > messing
>> > > > > >> > > > > > up the code hierarchy.
>> > > > > >> > > > > >
>> > > > > >> > > > >
>> > > > > >> > > > > These two coordinators will be on
the same broker only
>> > when
>> > > > > >> number of
>> > > > > >> > > > > partitions for transaction state
topic and consumer
>> offset
>> > > > topic
>> > > > > >> are
>> > > > > >> > > the
>> > > > > >> > > > > same. This normally holds true,
but I'm afraid
>> > > > > >> > > > > we couldn't make this assumption?
>> > > > > >> > > > >
>> > > > > >> > > > > 4. The config of "CONSUMER_GROUP_AWARE_TRANSACTION":
it
>> > > seems
>> > > > the
>> > > > > >> > goal
>> > > > > >> > > of
>> > > > > >> > > > > > this config is just to avoid
old-versioned broker to
>> not
>> > > be
>> > > > > >> able to
>> > > > > >> > > > > > recognize newer versioned client.
I think if we can
>> do
>> > > > something
>> > > > > >> > else
>> > > > > >> > > > to
>> > > > > >> > > > > > avoid this config though, for
example we can use the
>> > > > embedded
>> > > > > >> > > > AdminClient
>> > > > > >> > > > > > to send the APIVersion request
upon starting up, and
>> > based
>> > > > on
>> > > > > >> the
>> > > > > >> > > > > returned
>> > > > > >> > > > > > value decides whether to go
to the old code path or
>> the
>> > > new
>> > > > > >> > behavior.
>> > > > > >> > > > > > Admittedly asking a random
broker about APIVersion
>> does
>> > > not
>> > > > > >> > guarantee
>> > > > > >> > > > the
>> > > > > >> > > > > > whole cluster's versions, but
what we can do is to
>> first
>> > > 1)
>> > > > find
>> > > > > >> > the
>> > > > > >> > > > > > coordinator (and if the random
broker does not even
>> > > > recognize
>> > > > > >> the
>> > > > > >> > new
>> > > > > >> > > > > > discover type, fall back to
old path directly), and
>> then
>> > > 2)
>> > > > ask
>> > > > > >> the
>> > > > > >> > > > > > discovered coordinator about
its supported
>> APIVersion.
>> > > > > >> > > > > >
>> > > > > >> > > > >
>> > > > > >> > > > > The caveat here is that we have
to make sure both the
>> > group
>> > > > > >> > coordinator
>> > > > > >> > > > and
>> > > > > >> > > > > transaction coordinator are on the
latest version
>> during
>> > > init
>> > > > > >> stage.
>> > > > > >> > > This
>> > > > > >> > > > > is potentially doable as we only
need a consumer
>> group.id
>> > > > > >> > > > > to check that. In the meantime,
a hard-coded config is
>> > > still a
>> > > > > >> > > favorable
>> > > > > >> > > > > backup in case the server has downgraded,
so you will
>> want
>> > > to
>> > > > use
>> > > > > >> a
>> > > > > >> > new
>> > > > > >> > > > > version client without `consumer
group` transactional
>> > > support.
>> > > > > >> > > > >
>> > > > > >> > > > > 5. This is a meta question: have
you considered how
>> this
>> > can
>> > > > be
>> > > > > >> > applied
>> > > > > >> > > > to
>> > > > > >> > > > > > Kafka Connect as well? For
example, for source
>> > connectors,
>> > > > the
>> > > > > >> > > > assignment
>> > > > > >> > > > > > is not by "partitions", but
by some other sort of
>> > > > "resources"
>> > > > > >> based
>> > > > > >> > > on
>> > > > > >> > > > > the
>> > > > > >> > > > > > source systems, how KIP-447
would affect Kafka
>> > Connectors
>> > > > that
>> > > > > >> > > > > implemented
>> > > > > >> > > > > > EOS as well?
>> > > > > >> > > > > >
>> > > > > >> > > > >
>> > > > > >> > > > > No, it's not currently included
in the scope. Could you
>> > > point
>> > > > me
>> > > > > >> to a
>> > > > > >> > > > > sample source connector who uses
EOS? Could always
>> > > piggy-back
>> > > > into
>> > > > > >> > the
>> > > > > >> > > > > TxnProducerIdentity struct with
more information such
>> as
>> > > > tasks. If
>> > > > > >> > > > > this is something to support in
near term, an abstract
>> > type
>> > > > called
>> > > > > >> > > > > "Resource" could be provided and
let topic partition
>> and
>> > > > connect
>> > > > > >> task
>> > > > > >> > > > > implement it.
>> > > > > >> > > > >
>> > > > > >> > > > >
>> > > > > >> > > > > >
>> > > > > >> > > > > > Guozhang
>> > > > > >> > > > > >
>> > > > > >> > > > > >
>> > > > > >> > > > > > On Sat, Jun 22, 2019 at 8:40
PM Ismael Juma <
>> > > > ismael@juma.me.uk>
>> > > > > >> > > wrote:
>> > > > > >> > > > > >
>> > > > > >> > > > > > > Hi Boyang,
>> > > > > >> > > > > > >
>> > > > > >> > > > > > > Thanks for the KIP. It's
good that we listed a
>> number
>> > of
>> > > > > >> rejected
>> > > > > >> > > > > > > alternatives. It would
be helpful to have an
>> > explanation
>> > > > of
>> > > > > >> why
>> > > > > >> > > they
>> > > > > >> > > > > were
>> > > > > >> > > > > > > rejected.
>> > > > > >> > > > > > >
>> > > > > >> > > > > > > Ismael
>> > > > > >> > > > > > >
>> > > > > >> > > > > > > On Sat, Jun 22, 2019 at
8:31 PM Boyang Chen <
>> > > > > >> bchen11@outlook.com
>> > > > > >> > >
>> > > > > >> > > > > wrote:
>> > > > > >> > > > > > >
>> > > > > >> > > > > > > > Hey all,
>> > > > > >> > > > > > > >
>> > > > > >> > > > > > > > I would like to start
a discussion for KIP-447:
>> > > > > >> > > > > > > >
>> > > > > >> > > > > > > >
>> > > > > >> > > > > > >
>> > > > > >> > > > > >
>> > > > > >> > > > >
>> > > > > >> > > >
>> > > > > >> > >
>> > > > > >> >
>> > > > > >>
>> > > >
>> > >
>> >
>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-447%3A+Producer+scalability+for+exactly+once+semantics
>> > > > > >> > > > > > > >
>> > > > > >> > > > > > > > this is a work originated
by Jason Gustafson and
>> we
>> > > > would
>> > > > > >> like
>> > > > > >> > to
>> > > > > >> > > > > > proceed
>> > > > > >> > > > > > > > into discussion stage.
>> > > > > >> > > > > > > >
>> > > > > >> > > > > > > > Let me know your
thoughts, thanks!
>> > > > > >> > > > > > > >
>> > > > > >> > > > > > > > Boyang
>> > > > > >> > > > > > > >
>> > > > > >> > > > > > >
>> > > > > >> > > > > >
>> > > > > >> > > > > >
>> > > > > >> > > > > > --
>> > > > > >> > > > > > -- Guozhang
>> > > > > >> > > > > >
>> > > > > >> > > > >
>> > > > > >> > > >
>> > > > > >> > >
>> > > > > >> >
>> > > > > >> >
>> > > > > >> > --
>> > > > > >> > -- Guozhang
>> > > > > >> >
>> > > > > >>
>> > > > > >
>> > > > >
>> > > >
>> > >
>> >
>>
>>
>> --
>> -- Guozhang
>>
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message