kafka-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Guozhang Wang <wangg...@gmail.com>
Subject Re: [DISCUSS] KIP-250 Add Support for Quorum-based Producer Acknowledgment
Date Mon, 05 Mar 2018 06:38:57 GMT
Hi Litao,

When acks=all is used, we will only require all ISR replicas to ack before
returning, your understanding on this part is right.

What I meant is, that with acks=all, for the above example with {A, B, C},
if the ISR list still contains all three replicas, we will still require
ALL 3 of them to ack even if min.isr is only set to 2. So from A's point of
view, before she shrinks the ISR from {A, B, C} to {A, B} she cannot
respond a produce with ack=all; and when she wants to do that, she will get
an error from ZK tell her that the leader epoch is stale hence knowing that
she's no longer the leader. So with acks=all even when both A and B are
partitioned off we are still safe.

But with your proposal of acks=quorum, in the same case, A can actually
respond a produce request with ISR = {A, B, C} while the produced message
is only acked from A herself and B, because min.isr = 2 and hence it is
enough for a quorum. This will lead to the error case I described above
when the partition heals.

-------------

Besides this question, another issue that how the LEO-based leader election
will be implemented, which is still not covered in detail in your KIP wiki.
As discussed with Becket offline, one issue is that today the leader
election on the controller side is based on the synchronization barrier
provided by ZK, in the sense that leader will deterministically choose the
leader from the ZK's registered metadata which is guaranteed to be not
changed concurrently. However, if we now implement the new LEO-based
election based on, say, round trip communications between the controller
and the brokers, there is no such synchronization barrier, so for example
maybe right after the controller gets the LEO from A and B and determine A
has larger value, B gets a new batch of messages concurrently and hence its
LEO gets incremented than A's LEO. In this case the controller will choose
the leader who actually, does not have the longest LEO and hence will have
the same risk that some acked messages be truncated later because of that.

I'm wondering if you have already thought of this case, and if yes could
you complete the KIP wiki with a detailed description of the new leader
election implementation design?


Guozhang


On Mon, Feb 12, 2018 at 3:39 PM, Litao Deng <litao.deng@airbnb.com.invalid>
wrote:

> Hey Guozhang. Not fully understand your following comments.
>
> The goal of my approach is to maintain the behavior of ack="all", which
> > happen to do better than what Kafka is actually guaranteed: when both A
> and
> > B are partitioned off, produced records will not be acked since "all"
> > requires all replicas (not only ISRs, my previous email has an incorrect
> > term) are required.
>
>
> My understanding is once the leader's HW exceeds the required offset, it
> will check whether the current ISR is equal or larger than the min.isr. In
> terms of the HW, it will increase once all of the ISR replicated the
> message. So I am a little bit confused about "requires all replicas (not
> only ISRs, my previous email has an incorrect term)".
>
> Do you mean all of the replicas (ISR and the out of sync replicas) should
> commit the message once acks='all'?
>
> Thanks!
>
> On Mon, Feb 12, 2018 at 3:02 PM, Litao Deng <litao.deng@airbnb.com> wrote:
>
> > Folks. Thanks for all of the good discussions.
> >
> > Here are a few of my thoughts:
> >
> >    1. we won't change any existing semantics. That means, besides acks
> >    '-1 (all)', '0', '1', we will introduce a separate 'quorum' and
> document
> >    the semantic. 'quorum' is a totally different view of our replication
> >    protocol for the sake of better tail (P999) latency. I will advertise
> don't
> >    compare 'quorum' with '-1 (all)' and any other existing values.
> >    2. in terms of the controller functionality, I admit there are many
> >    awesome consensus protocols; however, for this specific KIP, I choose
> to
> >    minimize the impact/change on the controller code path.
> >       - we will keep the current controller's overall design and
> >       implementation by NOT introducing any consensus protocol.
> >       - we will introduce a topic level config "enable.quorum.acks"
> >       (default to false), and only accept acks='quorum' produce requests
> while
> >       the corresponding topic enabled this config. In this case, during
> the new
> >       leader election, we will only use the new LEO-based new leader
> election for
> >       the topics turned the "enable.quorum.acks" on. In this case, we
> only do
> >       LEO-based new leader election for the topics needed, and other
> topics won't
> >       pay the penalty.
> >
> > One requirement for this KIP is fully semantic backward compatible and
> > pay-as-you-go for the complexity of controller (longer new leader
> election
> > latency).
> >
> > Thoughts?
> >
> > On Mon, Feb 12, 2018 at 10:19 AM, Guozhang Wang <wangguoz@gmail.com>
> > wrote:
> >
> >> Hello Tao,
> >>
> >> No I was not proposing to change the mechanism of acks=all, and only
> >> mentioning that today even with acks=all the tolerance of failures is
> >> theoretically still bounded by min.isr settings though we do require all
> >> replicas in ISR (which may be larger than min.isr) to replicate before
> >> responding; this is what Jun mentioned may surprise many users today. I
> >> think with an additional "acks=quorum" can help resolve this, by
> requiring
> >> the num.acks >= majority (to make sure consistency is guaranteed with at
> >> most (X-1) / 2 failures with X number of replicas) AND num.acks >=
> min.isr
> >> (to specify if we want tolerate more failures than (X-1) / 2).
> >>
> >> The question then is, whether acks=all is still useful with introduced
> >> "quorum": if it is not, we can just replace the current semantics of
> "all"
> >> and document it. The example that we gave above, demonstrate that
> >> "acks=all" itself may still be useful even with the introduction of
> >> "quorum" since that scenario can be avoided by acks=all, but not
> >> acks=quorum as it requires ALL ISR replicas to replicate even if that
> >> number is larger than {min.isr} and also larger than the majority number
> >> (and if A is trying to shrink its ISR from {A,B,C} to {A,B} it will fail
> >> the ZK write since epoch has been incremented). Hence my proposal is to
> >> add
> >> a new config than replacing current semantics of "all".
> >>
> >>
> >> Guozhang
> >>
> >>
> >> On Sat, Feb 3, 2018 at 2:45 AM, tao xiao <xiaotao183@gmail.com> wrote:
> >>
> >> > Hi Guozhang,
> >> >
> >> > Are you proposing changing semantic of ack=all to acknowledge message
> >> only
> >> > after all replicas (not all ISRs, which is what Kafka currently is
> >> doing)
> >> > have committed the message? This is equivalent to setting
> >> min.isr=number of
> >> > replicas, which makes ack=all much stricter than what Kafka has right
> >> now.
> >> > I think this may introduce surprise to users too as producer will not
> >> > succeed in producing a message to Kafka when one of the followers is
> >> down
> >> >
> >> > On Sat, 3 Feb 2018 at 15:26 Guozhang Wang <wangguoz@gmail.com> wrote:
> >> >
> >> > > Hi Dong,
> >> > >
> >> > > Could you elaborate a bit more how controller could affect leaders
> to
> >> > > switch between all and quorum?
> >> > >
> >> > >
> >> > > Guozhang
> >> > >
> >> > >
> >> > > On Fri, Feb 2, 2018 at 10:12 PM, Dong Lin <lindong28@gmail.com>
> >> wrote:
> >> > >
> >> > > > Hey Guazhang,
> >> > > >
> >> > > > Got it. Thanks for the detailed explanation. I guess my point
is
> >> that
> >> > we
> >> > > > can probably achieve the best of both worlds, i.e. maintain the
> >> > existing
> >> > > > behavior of ack="all" while improving the tail latency.
> >> > > >
> >> > > > Thanks,
> >> > > > Dong
> >> > > >
> >> > > >
> >> > > >
> >> > > > On Fri, Feb 2, 2018 at 8:43 PM, Guozhang Wang <wangguoz@gmail.com
> >
> >> > > wrote:
> >> > > >
> >> > > >> Hi Dong,
> >> > > >>
> >> > > >> Yes, in terms of fault tolerance "quorum" does not do better
than
> >> > "all",
> >> > > >> as I said, with {min.isr} to X+1 Kafka is able to tolerate
X
> >> failures
> >> > > only.
> >> > > >> So if A and B are partitioned off at the same time, then
there
> are
> >> two
> >> > > >> concurrent failures and we do not guarantee all acked messages
> >> will be
> >> > > >> retained.
> >> > > >>
> >> > > >> The goal of my approach is to maintain the behavior of ack="all",
> >> > which
> >> > > >> happen to do better than what Kafka is actually guaranteed:
when
> >> both
> >> > A
> >> > > and
> >> > > >> B are partitioned off, produced records will not be acked
since
> >> "all"
> >> > > >> requires all replicas (not only ISRs, my previous email has
an
> >> > incorrect
> >> > > >> term) are required. This is doing better than tolerating
X
> >> failures,
> >> > > which
> >> > > >> I was proposing to keep, so that we would not introduce any
> >> regression
> >> > > >> "surprises" to users who are already using "all". In other
words,
> >> > > "quorum"
> >> > > >> is trading a bit of failure tolerance that is strictly defined
on
> >> > > min.isr
> >> > > >> for better tail latency.
> >> > > >>
> >> > > >>
> >> > > >> Guozhang
> >> > > >>
> >> > > >>
> >> > > >> On Fri, Feb 2, 2018 at 6:25 PM, Dong Lin <lindong28@gmail.com>
> >> wrote:
> >> > > >>
> >> > > >>> Hey Guozhang,
> >> > > >>>
> >> > > >>> According to the new proposal, with 3 replicas, min.isr=2
and
> >> > > >>> acks="quorum", it seems that acknowledged messages can
still be
> >> > > truncated
> >> > > >>> in the network partition scenario you mentioned, right?
So I
> guess
> >> > the
> >> > > goal
> >> > > >>> is for some user to achieve better tail latency at the
cost of
> >> > > potential
> >> > > >>> message loss?
> >> > > >>>
> >> > > >>> If this is the case, then I think it may be better to
adopt an
> >> > approach
> >> > > >>> where controller dynamically turn on/off this optimization.
This
> >> > > provides
> >> > > >>> user with peace of mind (i.e. no message loss) while
still
> >> reducing
> >> > > tail
> >> > > >>> latency. What do you think?
> >> > > >>>
> >> > > >>> Thanks,
> >> > > >>> Dong
> >> > > >>>
> >> > > >>>
> >> > > >>> On Fri, Feb 2, 2018 at 11:11 AM, Guozhang Wang <
> >> wangguoz@gmail.com>
> >> > > >>> wrote:
> >> > > >>>
> >> > > >>>> Hello Litao,
> >> > > >>>>
> >> > > >>>> Just double checking on the leader election details,
do you
> have
> >> > time
> >> > > >>>> to complete the proposal on that part?
> >> > > >>>>
> >> > > >>>> Also Jun mentioned one caveat related to KIP-250
on the KIP-232
> >> > > >>>> discussion thread that Dong is working on, I figured
it is
> worth
> >> > > pointing
> >> > > >>>> out here with a tentative solution:
> >> > > >>>>
> >> > > >>>>
> >> > > >>>> ```
> >> > > >>>> Currently, if the producer uses acks=-1, a write
will only
> >> succeed
> >> > if
> >> > > >>>> the write is received by all in-sync replicas (i.e.,
> committed).
> >> > This
> >> > > >>>> is true even when min.isr is set since we first wait
for a
> >> message
> >> > to
> >> > > >>>> be committed and then check the min.isr requirement.
KIP-250
> may
> >> > > >>>> change that, but we can discuss the implication there.
> >> > > >>>> ```
> >> > > >>>>
> >> > > >>>> The caveat is that, if we change the acking semantics
in
> KIP-250
> >> > that
> >> > > >>>> we will only requires num of {min.isr} to acknowledge
a
> produce,
> >> > then
> >> > > the
> >> > > >>>> above scenario will have a caveat: imagine you have
{A, B, C}
> >> > > replicas of a
> >> > > >>>> partition with A as the leader, all in the isr list,
and
> min.isr
> >> is
> >> > 2.
> >> > > >>>>
> >> > > >>>> 1. Say there is a network partition and both A and
B are fenced
> >> > off. C
> >> > > >>>> is elected as the new leader, it shrinks its isr
list to only
> >> {C};
> >> > > from A's
> >> > > >>>> point of view it does not know it becomes the "ghost"
and no
> >> longer
> >> > > the
> >> > > >>>> leader, all it does is shrinking the isr list to
{A, B}.
> >> > > >>>>
> >> > > >>>> 2. At this time, any new writes with ack=-1 to C
will not be
> >> acked,
> >> > > >>>> since from C's pov there is only one replica. This
is correct.
> >> > > >>>>
> >> > > >>>> 3. However, any writes that are send to A (NOTE this
is totally
> >> > > >>>> possible, since producers would only refresh metadata
> >> periodically,
> >> > > >>>> additionally if they happen to ask A or B they will
get the
> stale
> >> > > metadata
> >> > > >>>> that A's still the leader), since A thinks that isr
list is {A,
> >> B}
> >> > > and as
> >> > > >>>> long as B has replicated the message, A can acked
the produce.
> >> > > >>>>
> >> > > >>>>     This is not correct behavior, since when network
heals, A
> >> would
> >> > > >>>> realize it is not the leader and will truncate its
log. And
> hence
> >> > as a
> >> > > >>>> result the acked records are lost, violating Kafka's
> guarantees.
> >> And
> >> > > >>>> KIP-232 would not help preventing this scenario.
> >> > > >>>>
> >> > > >>>>
> >> > > >>>> Although one can argue that, with 3 replicas and
min.isr set to
> >> 2,
> >> > > >>>> Kafka is guaranteeing to tolerate only one failure,
while the
> >> above
> >> > > >>>> scenario is actually two concurrent failures (both
A and B are
> >> > > considered
> >> > > >>>> wedged), this is still a regression to the current
version.
> >> > > >>>>
> >> > > >>>> So to resolve this issue, I'd propose we can change
the
> >> semantics in
> >> > > >>>> the following way (this is only slightly different
from your
> >> > > proposal):
> >> > > >>>>
> >> > > >>>>
> >> > > >>>> 1. Add one more value to client-side acks config:
> >> > > >>>>
> >> > > >>>>    0: no acks needed at all.
> >> > > >>>>    1: ack from the leader.
> >> > > >>>>    all: ack from ALL the ISR replicas AND that current
number
> of
> >> isr
> >> > > >>>> replicas has to be no smaller than {min.isr} (i.e.
not changing
> >> this
> >> > > >>>> semantic).
> >> > > >>>>    quorum: this is the new value, it requires ack
from enough
> >> number
> >> > > of
> >> > > >>>> ISR replicas no smaller than majority of the replicas
AND no
> >> smaller
> >> > > than
> >> > > >>>> {min.isr}.
> >> > > >>>>
> >> > > >>>> 2. Clarify in the docs that if a user wants to tolerate
X
> >> failures,
> >> > > she
> >> > > >>>> needs to set client acks=all or acks=quorum (better
tail
> latency
> >> > than
> >> > > >>>> "all") with broker {min.sir} to be X+1; however,
"all" is not
> >> > > necessarily
> >> > > >>>> stronger than "quorum":
> >> > > >>>>
> >> > > >>>> For example, with 3 replicas, and {min.isr} set to
2. Here is a
> >> list
> >> > > of
> >> > > >>>> scenarios:
> >> > > >>>>
> >> > > >>>> a. ISR list has 3: "all" waits for all 3, "quorum"
waits for 2
> of
> >> > > them.
> >> > > >>>> b. ISR list has 2: "all" and "quorum" waits for both
2 of them.
> >> > > >>>> c. ISR list has 1: "all" and "quorum" would not ack.
> >> > > >>>>
> >> > > >>>> If {min.isr} is set to 1, interestingly, here would
be the list
> >> of
> >> > > >>>> scenarios:
> >> > > >>>>
> >> > > >>>> a. ISR list has 3: "all" waits for all 3, "quorum"
waits for 2
> of
> >> > > them.
> >> > > >>>> b. ISR list has 2: "all" and "quorum" waits for both
2 of them.
> >> > > >>>> c. ISR list has 1: "all" waits for leader to return,
while
> >> "quorum"
> >> > > >>>> would not ack (because it requires that number >
{min.isr}, AND
> >> >=
> >> > > >>>> {majority of num.replicas}, so its actually stronger
than
> "all").
> >> > > >>>>
> >> > > >>>>
> >> > > >>>> WDYT?
> >> > > >>>>
> >> > > >>>>
> >> > > >>>> Guozhang
> >> > > >>>>
> >> > > >>>>
> >> > > >>>> On Thu, Jan 25, 2018 at 8:13 PM, Dong Lin <lindong28@gmail.com
> >
> >> > > wrote:
> >> > > >>>>
> >> > > >>>>> Hey Litao,
> >> > > >>>>>
> >> > > >>>>> Not sure there will be an easy way to select
the broker with
> >> > highest
> >> > > >>>>> LEO
> >> > > >>>>> without losing acknowledged message. In case
it is useful,
> here
> >> is
> >> > > >>>>> another
> >> > > >>>>> idea. Maybe we can have a mechanism to turn switch
between the
> >> > > min.isr
> >> > > >>>>> and
> >> > > >>>>> isr set for determining when to acknowledge a
message.
> >> Controller
> >> > can
> >> > > >>>>> probably use RPC to request the current leader
to use isr set
> >> > before
> >> > > it
> >> > > >>>>> sends LeaderAndIsrRequest for leadership change.
> >> > > >>>>>
> >> > > >>>>> Regards,
> >> > > >>>>> Dong
> >> > > >>>>>
> >> > > >>>>>
> >> > > >>>>> On Wed, Jan 24, 2018 at 7:29 PM, Litao Deng
> >> > > >>>>> <litao.deng@airbnb.com.invalid>
> >> > > >>>>> wrote:
> >> > > >>>>>
> >> > > >>>>> > Thanks Jun for the detailed feedback.
> >> > > >>>>> >
> >> > > >>>>> > Yes, for #1, I mean the live replicas from
the ISR.
> >> > > >>>>> >
> >> > > >>>>> > Actually, I believe for all of the 4 new
leader election
> >> > strategies
> >> > > >>>>> > (offline, reassign, preferred replica and
controlled
> >> shutdown),
> >> > we
> >> > > >>>>> need to
> >> > > >>>>> > make corresponding changes. Will document
the details in the
> >> KIP.
> >> > > >>>>> >
> >> > > >>>>> > On Wed, Jan 24, 2018 at 3:59 PM, Jun Rao
<jun@confluent.io>
> >> > wrote:
> >> > > >>>>> >
> >> > > >>>>> > > Hi, Litao,
> >> > > >>>>> > >
> >> > > >>>>> > > Thanks for the KIP. Good proposal.
A few comments below.
> >> > > >>>>> > >
> >> > > >>>>> > > 1. The KIP says "select the live replica
with the largest
> >> LEO".
> >> > > I
> >> > > >>>>> guess
> >> > > >>>>> > > what you meant is selecting the live
replicas in ISR with
> >> the
> >> > > >>>>> largest
> >> > > >>>>> > LEO?
> >> > > >>>>> > >
> >> > > >>>>> > > 2. I agree that we can probably just
reuse the current
> >> min.isr
> >> > > >>>>> > > configuration, but with a slightly
different semantics.
> >> > > Currently,
> >> > > >>>>> if
> >> > > >>>>> > > min.isr is set, a user expects the
record to be in at
> least
> >> > > min.isr
> >> > > >>>>> > > replicas on successful ack. This KIP
guarantees this too.
> >> Most
> >> > > >>>>> people are
> >> > > >>>>> > > probably surprised that currently the
ack is only sent
> back
> >> > after
> >> > > >>>>> all
> >> > > >>>>> > > replicas in ISR receive the record.
This KIP will change
> the
> >> > ack
> >> > > >>>>> to only
> >> > > >>>>> > > wait on min.isr replicas, which matches
the user's
> >> expectation
> >> > > and
> >> > > >>>>> gives
> >> > > >>>>> > > better latency. Currently, we guarantee
no data loss if
> >> there
> >> > are
> >> > > >>>>> fewer
> >> > > >>>>> > > than replication factor failures. The
KIP changes that to
> >> fewer
> >> > > >>>>> than
> >> > > >>>>> > > min.isr failures. The latter probably
matches the user
> >> > > expectation.
> >> > > >>>>> > >
> >> > > >>>>> > > 3. I agree that the new leader election
process is a bit
> >> more
> >> > > >>>>> > complicated.
> >> > > >>>>> > > The controller now needs to contact
all replicas in ISR to
> >> > > >>>>> determine who
> >> > > >>>>> > > has the longest log. However, this
happens infrequently.
> So,
> >> > it's
> >> > > >>>>> > probably
> >> > > >>>>> > > worth doing for the better latency
in #2.
> >> > > >>>>> > >
> >> > > >>>>> > > 4. We have to think through the preferred
leader election
> >> > > process.
> >> > > >>>>> > > Currently, the first assigned replica
is preferred for
> load
> >> > > >>>>> balancing.
> >> > > >>>>> > > There is a process to automatically
move the leader to the
> >> > > >>>>> preferred
> >> > > >>>>> > > replica when it's in sync. The issue
is that the preferred
> >> > > replica
> >> > > >>>>> may no
> >> > > >>>>> > > be the replica with the longest log.
Naively switching to
> >> the
> >> > > >>>>> preferred
> >> > > >>>>> > > replica may cause data loss when there
are actually fewer
> >> > > failures
> >> > > >>>>> than
> >> > > >>>>> > > configured min.isr. One way to address
this issue is to do
> >> the
> >> > > >>>>> following
> >> > > >>>>> > > steps during preferred leader election:
(a) controller
> >> sends an
> >> > > RPC
> >> > > >>>>> > request
> >> > > >>>>> > > to the current leader; (b) the current
leader stops taking
> >> new
> >> > > >>>>> writes
> >> > > >>>>> > > (sending a new error code to the clients)
and returns its
> >> LEO
> >> > > >>>>> (call it L)
> >> > > >>>>> > > to the controller; (c) the controller
issues an RPC
> request
> >> to
> >> > > the
> >> > > >>>>> > > preferred replica and waits its LEO
to reach L; (d) the
> >> > > controller
> >> > > >>>>> > changes
> >> > > >>>>> > > the leader to the preferred replica.
> >> > > >>>>> > >
> >> > > >>>>> > > Jun
> >> > > >>>>> > >
> >> > > >>>>> > > On Wed, Jan 24, 2018 at 2:51 PM, Litao
Deng
> >> > > >>>>> > <litao.deng@airbnb.com.invalid
> >> > > >>>>> > > >
> >> > > >>>>> > > wrote:
> >> > > >>>>> > >
> >> > > >>>>> > > > Sorry folks, just realized I didn't
use the correct
> thread
> >> > > >>>>> format for
> >> > > >>>>> > the
> >> > > >>>>> > > > discussion. I started this new
one and copied all of the
> >> > > >>>>> responses from
> >> > > >>>>> > > the
> >> > > >>>>> > > > old one.
> >> > > >>>>> > > >
> >> > > >>>>> > > > @Dong
> >> > > >>>>> > > > It makes sense to just use the
min.insync.replicas
> >> instead of
> >> > > >>>>> > > introducing a
> >> > > >>>>> > > > new config, and we must make this
change together with
> the
> >> > > >>>>> LEO-based
> >> > > >>>>> > new
> >> > > >>>>> > > > leader election.
> >> > > >>>>> > > >
> >> > > >>>>> > > > @Xi
> >> > > >>>>> > > > I thought about embedding the
LEO information to the
> >> > > >>>>> ControllerContext,
> >> > > >>>>> > > > didn't find a way. Using RPC will
make the leader
> election
> >> > > period
> >> > > >>>>> > longer
> >> > > >>>>> > > > and this should happen in very
rare cases (broker
> failure,
> >> > > >>>>> controlled
> >> > > >>>>> > > > shutdown, preferred leader election
and partition
> >> > > reassignment).
> >> > > >>>>> > > >
> >> > > >>>>> > > > @Jeff
> >> > > >>>>> > > > The current leader election is
to pick the first replica
> >> from
> >> > > AR
> >> > > >>>>> which
> >> > > >>>>> > > > exists both in the live brokers
and ISR sets. I agree
> with
> >> > you
> >> > > >>>>> about
> >> > > >>>>> > > > changing the current/default behavior
will cause many
> >> > > >>>>> confusions, and
> >> > > >>>>> > > > that's the reason the title is
"Add Support ...". In
> this
> >> > case,
> >> > > >>>>> we
> >> > > >>>>> > > wouldn't
> >> > > >>>>> > > > break any current promises and
provide a separate option
> >> for
> >> > > our
> >> > > >>>>> user.
> >> > > >>>>> > > > In terms of KIP-250, I feel it
is more like the
> >> > > "Semisynchronous
> >> > > >>>>> > > > Replication" in the MySQL world,
and yes it is something
> >> > > between
> >> > > >>>>> acks=1
> >> > > >>>>> > > and
> >> > > >>>>> > > > acks=insync.replicas. Additionally,
I feel KIP-250 and
> >> > KIP-227
> >> > > >>>>> are
> >> > > >>>>> > > > two orthogonal improvements. KIP-227
is to improve the
> >> > > >>>>> replication
> >> > > >>>>> > > protocol
> >> > > >>>>> > > > (like the introduction of parallel
replication in
> MySQL),
> >> and
> >> > > >>>>> KIP-250
> >> > > >>>>> > is
> >> > > >>>>> > > an
> >> > > >>>>> > > > enhancement for the replication
architecture (sync,
> >> > semi-sync,
> >> > > >>>>> and
> >> > > >>>>> > > async).
> >> > > >>>>> > > >
> >> > > >>>>> > > >
> >> > > >>>>> > > > Dong Lin
> >> > > >>>>> > > >
> >> > > >>>>> > > > > Thanks for the KIP. I have
one quick comment before
> you
> >> > > >>>>> provide more
> >> > > >>>>> > > > detail
> >> > > >>>>> > > > > on how to select the leader
with the largest LEO.
> >> > > >>>>> > > > > Do you think it would make
sense to change the default
> >> > > >>>>> behavior of
> >> > > >>>>> > > > acks=-1,
> >> > > >>>>> > > > > such that broker will acknowledge
the message once the
> >> > > message
> >> > > >>>>> has
> >> > > >>>>> > been
> >> > > >>>>> > > > > replicated to min.insync.replicas
brokers? This would
> >> allow
> >> > > us
> >> > > >>>>> to
> >> > > >>>>> > keep
> >> > > >>>>> > > > the
> >> > > >>>>> > > > > same durability guarantee,
improve produce request
> >> latency
> >> > > >>>>> without
> >> > > >>>>> > > > having a
> >> > > >>>>> > > > > new config.
> >> > > >>>>> > > >
> >> > > >>>>> > > >
> >> > > >>>>> > > > Hu Xi
> >> > > >>>>> > > >
> >> > > >>>>> > > > > Currently,  with holding
the assigned replicas(AR) for
> >> all
> >> > > >>>>> > partitions,
> >> > > >>>>> > > > > controller is now able to
elect new leaders by
> selecting
> >> > the
> >> > > >>>>> first
> >> > > >>>>> > > > replica
> >> > > >>>>> > > > > of AR which occurs in both
live replica set and ISR.
> If
> >> > > >>>>> switching to
> >> > > >>>>> > > the
> >> > > >>>>> > > > > LEO-based strategy, controller
context might need to
> be
> >> > > >>>>> enriched or
> >> > > >>>>> > > > > augmented to store those
values.  If retrieving those
> >> LEOs
> >> > > >>>>> real-time,
> >> > > >>>>> > > > > several rounds of RPCs are
unavoidable which seems to
> >> > violate
> >> > > >>>>> the
> >> > > >>>>> > > > original
> >> > > >>>>> > > > > intention of this KIP.‚Äč
> >> > > >>>>> > > >
> >> > > >>>>> > > >
> >> > > >>>>> > > > Jeff Widman
> >> > > >>>>> > > >
> >> > > >>>>> > > > > I agree with Dong, we should
see if it's possible to
> >> change
> >> > > the
> >> > > >>>>> > default
> >> > > >>>>> > > > > behavior so that as soon
as min.insync.replicas
> brokers
> >> > > >>>>> respond than
> >> > > >>>>> > > the
> >> > > >>>>> > > > > broker acknowledges the message
back to the client
> >> without
> >> > > >>>>> waiting
> >> > > >>>>> > for
> >> > > >>>>> > > > > additional brokers who are
in the in-sync replica list
> >> to
> >> > > >>>>> respond. (I
> >> > > >>>>> > > > > actually thought it already
worked this way).
> >> > > >>>>> > > > > As you implied in the KIP
though, changing this
> default
> >> > > >>>>> introduces a
> >> > > >>>>> > > > weird
> >> > > >>>>> > > > > state where an in-sync follower
broker is not
> >> guaranteed to
> >> > > >>>>> have a
> >> > > >>>>> > > > > message...
> >> > > >>>>> > > > > So at a minimum, the leadership
failover algorithm
> would
> >> > need
> >> > > >>>>> to be
> >> > > >>>>> > > sure
> >> > > >>>>> > > > to
> >> > > >>>>> > > > > pick the most up-to-date
follower... I thought it
> >> already
> >> > did
> >> > > >>>>> this?
> >> > > >>>>> > > > > But if multiple brokers fail
in quick succession,
> then a
> >> > > >>>>> broker that
> >> > > >>>>> > > was
> >> > > >>>>> > > > in
> >> > > >>>>> > > > > the ISR could become a leader
without ever receiving
> the
> >> > > >>>>> message...
> >> > > >>>>> > > > > violating the current promises
of
> >> unclean.leader.election.
> >> > > >>>>> > > > enable=False...
> >> > > >>>>> > > > > so changing the default might
be not be a tenable
> >> solution.
> >> > > >>>>> > > > > What also jumped out at me
in the KIP was the goal of
> >> > > reducing
> >> > > >>>>> p999
> >> > > >>>>> > > when
> >> > > >>>>> > > > > setting replica lag time
at 10 seconds(!!)... I
> >> understand
> >> > > the
> >> > > >>>>> desire
> >> > > >>>>> > > to
> >> > > >>>>> > > > > minimize frequent ISR shrink/expansion,
as I face this
> >> same
> >> > > >>>>> issue at
> >> > > >>>>> > my
> >> > > >>>>> > > > day
> >> > > >>>>> > > > > job. But what you're essentially
trying to do here is
> >> > create
> >> > > an
> >> > > >>>>> > > > additional
> >> > > >>>>> > > > > replication state that is
in-between acks=1 and acks =
> >> ISR
> >> > to
> >> > > >>>>> paper
> >> > > >>>>> > > over
> >> > > >>>>> > > > a
> >> > > >>>>> > > > > root problem of ISR shrink/expansion...
> >> > > >>>>> > > > > I'm just wary of shipping
more features (and more
> >> > operational
> >> > > >>>>> > > confusion)
> >> > > >>>>> > > > if
> >> > > >>>>> > > > > it's only addressing the
symptom rather than the root
> >> > cause.
> >> > > >>>>> For
> >> > > >>>>> > > example,
> >> > > >>>>> > > > > my day job's problem is we
run a very high number of
> >> > > >>>>> low-traffic
> >> > > >>>>> > > > > partitions-per-broker, so
the fetch requests hit many
> >> > > >>>>> partitions
> >> > > >>>>> > before
> >> > > >>>>> > > > > they fill. Solving that requires
changing our
> >> architecture
> >> > +
> >> > > >>>>> making
> >> > > >>>>> > the
> >> > > >>>>> > > > > replication protocol more
efficient (KIP-227).
> >> > > >>>>> > > >
> >> > > >>>>> > > >
> >> > > >>>>> > > > On Tue, Jan 23, 2018 at 10:02
PM, Litao Deng <
> >> > > >>>>> litao.deng@airbnb.com>
> >> > > >>>>> > > > wrote:
> >> > > >>>>> > > >
> >> > > >>>>> > > > > Hey folks. I would like to
add a feature to support
> the
> >> > > >>>>> quorum-based
> >> > > >>>>> > > > > acknowledgment for the producer
request. We have been
> >> > > running a
> >> > > >>>>> > > modified
> >> > > >>>>> > > > > version of Kafka on our testing
cluster for weeks, the
> >> > > >>>>> improvement of
> >> > > >>>>> > > > P999
> >> > > >>>>> > > > > is significant with very
stable latency.
> Additionally, I
> >> > > have a
> >> > > >>>>> > > proposal
> >> > > >>>>> > > > to
> >> > > >>>>> > > > > achieve a similar data durability
as with the
> >> > > >>>>> insync.replicas-based
> >> > > >>>>> > > > > acknowledgment through LEO-based
leader election.
> >> > > >>>>> > > > >
> >> > > >>>>> > > > > https://cwiki.apache.org/
> confluence/display/KAFKA/KIP-
> >> > > >>>>> > > > > 250+Add+Support+for+Quorum-based+Producer+Acknowledge
> >> > > >>>>> > > > >
> >> > > >>>>> > > >
> >> > > >>>>> > >
> >> > > >>>>> >
> >> > > >>>>>
> >> > > >>>>
> >> > > >>>>
> >> > > >>>>
> >> > > >>>> --
> >> > > >>>> -- Guozhang
> >> > > >>>>
> >> > > >>>
> >> > > >>>
> >> > > >>
> >> > > >>
> >> > > >> --
> >> > > >> -- Guozhang
> >> > > >>
> >> > > >
> >> > > >
> >> > >
> >> > >
> >> > > --
> >> > > -- Guozhang
> >> > >
> >> >
> >>
> >>
> >>
> >> --
> >> -- Guozhang
> >>
> >
> >
>



-- 
-- Guozhang

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message