kafka-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Bruno Cadonna <br...@confluent.io>
Subject Re: [DISCUSS] KIP-441: Smooth Scaling Out for Kafka Streams
Date Wed, 04 Sep 2019 11:03:28 GMT
Hi,

1) What do you mean with "full set of offsets in the topic"? Is this
the sum of all offsets of the changelog partitions of the task?

2) I am not sure whether non-logged stateful tasks should be
effectively treated as stateless tasks during assignment. First we
need to decide whether a non-logged stateful task should preferably be
assigned to the same instance on which it just run in order to
continue to use its state or not.

3) In the example, you define stand-by tasks {S1, S2, ...} but never
use them, because below you use a dedicated row for stand-by tasks.

As a side note to 2) since it is not directly related to this KIP: We
should decide if we want to avoid the possible non-determinism
introduced by non-logged stores or not. That is, if an instance hosts
a task with non-logged stores then we can have two cases after the
next rebalance: a) the task stays on the same instance and continues
to use the same state store as used so far or b) the task is assigned
to another instance and it starts an empty state store. The produced
results for these two cases might differ. To avoid the nondeterminism,
non-logged state stores would need to be wiped out before assignment.
Then the question arises, how the removal of non-logged state stores
before assignment would affect backward-compatibility.

Best,
Bruno

On Wed, Aug 21, 2019 at 11:40 PM John Roesler <john@confluent.io> wrote:
>
> Hi Guozhang,
>
> > My impression from your previous email is that inside the algorithm when
> we
> are "filling" them to instances some deterministic logic would be used to
> avoid the above case, is that correct?
>
> Yes, that was my plan, but I didn't formalize it. There was a requirement
> that the assignment algorithm must not produce a new assignment if the
> current assignment is already balanced, so at the least, any thrashing
> would be restricted to the "balancing" phase while tasks are moving around
> the cluster.
>
> Anyway, I think it would be good to say that we'll "try to" produce stable
> assignments, so I've added a "should" clause to the assignment spec:
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-441%3A+Smooth+Scaling+Out+for+Kafka+Streams#KIP-441:SmoothScalingOutforKafkaStreams-AssignmentAlgorithm
>
> For example, we would sort the stateless tasks and available instances
> before assigning them, so that the stateless task assignment would mostly
> stay stable between assignments, modulo the compute capacity of the
> instances changing a little as active stateful tasks get assigned in more
> balanced ways.
>
> Thanks,
> -John
>
>
> On Wed, Aug 21, 2019 at 1:55 PM Guozhang Wang <wangguoz@gmail.com> wrote:
>
> > Hello John,
> >
> > That sounds reasonable. Just double checked the code that with logging
> > disabled the corresponding checkpoint file would not contain any values,
> > just like a stateless task. So I think treating them logically the same is
> > fine.
> >
> > Guozhang
> >
> >
> > On Wed, Aug 21, 2019 at 11:41 AM John Roesler <john@confluent.io> wrote:
> >
> > > Hi again, Guozhang,
> > >
> > > While writing up the section on stateless tasks (
> > >
> > >
> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-441%3A+Smooth+Scaling+Out+for+Kafka+Streams#KIP-441:SmoothScalingOutforKafkaStreams-Statelesstasks
> > > ),
> > > I reconsidered whether stateful, but non-logged, tasks should actually
> > > report a lag of zero, versus not reporting any lag. By the definition of
> > > the "StatefulTasksToRankedCandidates" function, the leader would compute
> > a
> > > lag of zero for these tasks anyway.
> > >
> > > Therefore, I think the same reasoning that I supplied you for stateless
> > > tasks applies, since the member and leader will agree on a lag of zero
> > > anyway, we can avoid adding them to the "Task Lags" map, and save some
> > > bytes in the JoinGroup request. This would be especially beneficial in an
> > > application that uses remote stores for _all_ its state stores, it would
> > > have an extremely lightweight JoinGroup request, with no task lags at
> > all.
> > >
> > > WDYT?
> > > -John
> > >
> > > On Wed, Aug 21, 2019 at 1:17 PM John Roesler <john@confluent.io> wrote:
> > >
> > > > Thanks, Guozhang.
> > > >
> > > > (Side note: I noticed on another pass over the discussion that I'd
> > missed
> > > > addressing your comment about the potential race condition between
> > state
> > > > cleanup and lag-based assignment. I've added a solution to the
> > proposal:
> > > >
> > >
> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-441%3A+Smooth+Scaling+Out+for+Kafka+Streams#KIP-441:SmoothScalingOutforKafkaStreams-Racebetweenassignmentandstatecleanup
> > > > )
> > > >
> > > > In the JoinGroup (SubscriptionInfo) metadata, stateless tasks are not
> > > > represented at all. This should save us some bytes in the request
> > > metadata.
> > > > If we treated them like non-logged stateful tasks and reported a lag of
> > > 0,
> > > > the only difference is that the assignor would be able to tell which
> > > > members previously hosted that stateless task.
> > > >
> > > > I'd like to make a simplifying assumption that stateless tasks can just
> > > be
> > > > freely reassigned with no regard to stickiness at all, without
> > impacting
> > > > performance. This is almost true. In fact, while assigned a stateless
> > > task,
> > > > a member fetches batches of records from the broker, so if we move the
> > > > stateless task assignment, this buffered input is wasted and just gets
> > > > dropped.
> > > >
> > > > However, we won't be moving the stateless tasks around all the time
> > (just
> > > > during rebalances), and we have the requirement that the assigment
> > > > algorithm must stabilize to guard against perpetually shuffling a
> > > stateless
> > > > task from one node to another. So, my hope is that this small amount of
> > > > inefficiency would not be a performance-dominating factor. In exchange,
> > > we
> > > > gain the opportunity for the assignment algorithm to use the stateless
> > > > tasks as "filler" during unbalanced assignments. For example, if there
> > > is a
> > > > node that is just warming up with several standby tasks, maybe the
> > > > assignment can give more stateless tasks to that node to balance the
> > > > computational load across the cluster.
> > > >
> > > > It's worth noting that such an assignment would still not be considered
> > > > "balanced", so the ultimately balanced final state of the assignment
> > > (after
> > > > task movements) would still have the desired property that each
> > stateful
> > > > and stateless task is evenly spread across the cluster.
> > > >
> > > > Does that seem reasonable?
> > > >
> > > > Thanks,
> > > > -John
> > > >
> > > > On Wed, Aug 21, 2019 at 11:22 AM Guozhang Wang <wangguoz@gmail.com>
> > > wrote:
> > > >
> > > >> Hello John,
> > > >>
> > > >> I've made another pass on the wiki page again, overall LGTM. One meta
> > > >> comment about the "stateless" tasks: how do we represent them in the
> > > >> metadata? Are they just treated as stateful tasks with logging
> > disabled,
> > > >> or
> > > >> are specially handled? It is not very clear in the description.
> > > >>
> > > >>
> > > >> Guozhang
> > > >>
> > > >> On Wed, Aug 21, 2019 at 8:43 AM John Roesler <john@confluent.io>
> > wrote:
> > > >>
> > > >> > I have also specifically called out that the assignment must achieve
> > > >> both
> > > >> > "instance" and "task" balance:
> > > >> >
> > > >> >
> > > >>
> > >
> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-441%3A+Smooth+Scaling+Out+for+Kafka+Streams#KIP-441:SmoothScalingOutforKafkaStreams-Defining%22balance%22
> > > >> >
> > > >> > I've also addressed the problem of state stores with logging
> > disabled:
> > > >> >
> > > >> >
> > > >>
> > >
> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-441%3A+Smooth+Scaling+Out+for+Kafka+Streams#KIP-441:SmoothScalingOutforKafkaStreams-Statewithloggingdisabled
> > > >> >
> > > >> > I believe this addresses all the concerns that have been raised to
> > > date.
> > > >> > Apologies if I've overlooked one of your concerns.
> > > >> >
> > > >> > Please give the KIP another read and let me know of any further
> > > >> thoughts!
> > > >> > Hopefully, we can start the voting on this KIP by the end of the
> > week.
> > > >> >
> > > >> > Thanks,
> > > >> > -John
> > > >> >
> > > >> > On Tue, Aug 20, 2019 at 5:16 PM John Roesler <john@confluent.io>
> > > wrote:
> > > >> >
> > > >> > > In response to Bruno's concern #2, I've also added that section to
> > > the
> > > >> > > "Rejected Alternatives" section.
> > > >> > >
> > > >> > > Additionally, after reviewing some other assignment papers, I've
> > > >> > developed
> > > >> > > the concern that specifying which "phases" the assignment
> > algorithm
> > > >> > should
> > > >> > > have, or indeed the logic of it at all, might be a mistake that
> > > >> > > over-constrains our ability to write an optimal algorithm.
> > > Therefore,
> > > >> > I've
> > > >> > > also refactored the KIP to just describe the protocol, and specify
> > > the
> > > >> > > requirements for the assignment algorithm, but not its exact
> > > behavior
> > > >> at
> > > >> > > all.
> > > >> > >
> > > >> > > Thanks,
> > > >> > > -John
> > > >> > >
> > > >> > > On Tue, Aug 20, 2019 at 5:13 PM John Roesler <john@confluent.io>
> > > >> wrote:
> > > >> > >
> > > >> > >> Hi All,
> > > >> > >>
> > > >> > >> Thanks for the discussion. I've been considering the idea of
> > giving
> > > >> the
> > > >> > >> "catching up" tasks a different name/role. I was in favor
> > > initially,
> > > >> but
> > > >> > >> after working though some details, I think it causes some
> > problems,
> > > >> > which
> > > >> > >> I've written up in the "rejected alternatives" part of the KIP:
> > > >> > >>
> > > >> >
> > > >>
> > >
> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-441%3A+Smooth+Scaling+Out+for+Kafka+Streams#KIP-441:SmoothScalingOutforKafkaStreams-Addinganewkindoftask(%22moving%22,%22recovering%22,%22learner%22)fortaskmovements
> > > >> > >>
> > > >> > >> Please give it a read and let me know what you think.
> > > >> > >>
> > > >> > >> Thanks,
> > > >> > >> -John
> > > >> > >>
> > > >> > >> On Thu, Aug 8, 2019 at 7:57 PM Guozhang Wang <wangguoz@gmail.com
> > >
> > > >> > wrote:
> > > >> > >>
> > > >> > >>> I think I agree with you Sophie. My gut feeling is that 1) it
> > > should
> > > >> > not
> > > >> > >>> be
> > > >> > >>> the major concern in assignor's algorithm for standby tasks not
> > > >> > catching
> > > >> > >>> up, but rather be tackled in different modules, and 2) a lot of
> > > >> > >>> optimization can be down at the stream thread itself, like
> > > dedicated
> > > >> > >>> threading and larger batching, or even complicated scheduling
> > > >> > mechanisms
> > > >> > >>> between running, restoring and standby tasks. In anyways, I
> > think
> > > we
> > > >> > can
> > > >> > >>> take this out of the scope of KIP-441 for now.
> > > >> > >>>
> > > >> > >>>
> > > >> > >>> Guozhang
> > > >> > >>>
> > > >> > >>>
> > > >> > >>> On Thu, Aug 8, 2019 at 5:48 PM Sophie Blee-Goldman <
> > > >> > sophie@confluent.io>
> > > >> > >>> wrote:
> > > >> > >>>
> > > >> > >>> > > we may have other ways to not starving the standby tasks,
> > for
> > > >> > >>> example, by
> > > >> > >>> > > using dedicate threads for standby tasks or even consider
> > > having
> > > >> > >>> > *higher> priority for standby than active* so that we always
> > try
> > > >> to
> > > >> > >>> caught
> > > >> > >>> > up standby
> > > >> > >>> > > first, then process active
> > > >> > >>> >
> > > >> > >>> > This is an interesting idea, but seems likely to get in the
> > way
> > > of
> > > >> > the
> > > >> > >>> > original idea of this KIP
> > > >> > >>> > -- if we always process standby tasks first, then if we are
> > > >> assigned
> > > >> > a
> > > >> > >>> new
> > > >> > >>> > standby task we
> > > >> > >>> > will have to wait for it to catch up completely before
> > > processing
> > > >> any
> > > >> > >>> > active tasks! That's
> > > >> > >>> > even worse than the situation this KIP is trying to help with,
> > > >> since
> > > >> > a
> > > >> > >>> new
> > > >> > >>> > standby task has to
> > > >> > >>> > restore from 0 (whereas an active task at least can take over
> > > from
> > > >> > >>> wherever
> > > >> > >>> > the standby was).
> > > >> > >>> >
> > > >> > >>> > During restoration -- while there exist any restoring tasks
> > -- I
> > > >> > think
> > > >> > >>> it's
> > > >> > >>> > reasonable to de-prioritize the
> > > >> > >>> > standby tasks and just process restoring and active tasks so
> > > both
> > > >> can
> > > >> > >>> make
> > > >> > >>> > progress. But we should
> > > >> > >>> > let them catch up afterwards somehow -- maybe we can apply
> > some
> > > >> kind
> > > >> > of
> > > >> > >>> > heuristic, like "if we haven't
> > > >> > >>> > processed standbys for X iterations, or Y milliseconds, do so
> > > >> now."
> > > >> > >>> >
> > > >> > >>> > Actually, it might even be beneficial to avoid processing
> > > >> standbys a
> > > >> > >>> record
> > > >> > >>> > or two at a time and instead
> > > >> > >>> > wait for a large enough batch to build up for the RocksDB
> > > >> > bulk-loading
> > > >> > >>> > benefits.
> > > >> > >>> >
> > > >> > >>> > I think the "use dedicated threads for standby" is the more
> > > >> promising
> > > >> > >>> end
> > > >> > >>> > goal, especially since
> > > >> > >>> > if we split restoration into "restoring tasks" then active and
> > > >> > standbys
> > > >> > >>> > share almost nothing. But
> > > >> > >>> > that seems like follow-up work to the current KIP :)
> > > >> > >>> >
> > > >> > >>> > On Thu, Aug 8, 2019 at 5:31 PM Sophie Blee-Goldman <
> > > >> > >>> sophie@confluent.io>
> > > >> > >>> > wrote:
> > > >> > >>> >
> > > >> > >>> > > Stateful tasks with logging disabled seem to be an
> > interesting
> > > >> edge
> > > >> > >>> case.
> > > >> > >>> > > On the one hand,
> > > >> > >>> > > for balancing purposes they should be considered stateful
> > > since
> > > >> as
> > > >> > >>> > > Guozhang pointed out
> > > >> > >>> > > they are still "heavy" in IO costs. But for "catching up"
> > > >> purposes,
> > > >> > >>> ie
> > > >> > >>> > > when allocating standby
> > > >> > >>> > > tasks that will become active tasks, they should be
> > considered
> > > >> > >>> stateless
> > > >> > >>> > > as there is so
> > > >> > >>> > > meaningful sense of their lag. We should never allocate
> > > standby
> > > >> > >>> tasks for
> > > >> > >>> > > them during the
> > > >> > >>> > > first rebalance, but should ensure they are evenly
> > distributed
> > > >> > across
> > > >> > >>> > > instances. Maybe we
> > > >> > >>> > > should split these into a third category -- after we assign
> > > all
> > > >> > >>> stateful
> > > >> > >>> > > tasks with logging, we
> > > >> > >>> > > then distribute the set of logging-disabled stateful tasks
> > to
> > > >> > improve
> > > >> > >>> > > balance, before lastly
> > > >> > >>> > > distributing stateless tasks?
> > > >> > >>> > >
> > > >> > >>> > > This actually leads into what I was just thinking, which is
> > > >> that we
> > > >> > >>> > really
> > > >> > >>> > > should distinguish the
> > > >> > >>> > > "catch-up" standbys from normal standbys as well as
> > > >> distinguishing
> > > >> > >>> > > actively processing tasks
> > > >> > >>> > > from active tasks that are still in the restore phase. It's
> > > >> > somewhat
> > > >> > >>> > > awkward that today, some
> > > >> > >>> > > active tasks just start processing immediately while others
> > > >> behave
> > > >> > >>> more
> > > >> > >>> > > like standby than active
> > > >> > >>> > > tasks for some time, before switching to real active. They
> > > first
> > > >> > use
> > > >> > >>> the
> > > >> > >>> > > restoreConsumer, then
> > > >> > >>> > > later only the "normal" consumer.
> > > >> > >>> > >
> > > >> > >>> > > However, this restore period is still distinct from normal
> > > >> standbys
> > > >> > >>> in a
> > > >> > >>> > > lot of ways -- the code path
> > > >> > >>> > > for restoring is different than for updating standbys, for
> > > >> example
> > > >> > >>> in how
> > > >> > >>> > > long we block on #poll.
> > > >> > >>> > > So in addition to giving them their own name -- let's go
> > with
> > > >> > >>> restoring
> > > >> > >>> > > task for now -- they really
> > > >> > >>> > > do seem to deserve being their own distinct task. We can
> > > >> optimize
> > > >> > >>> them
> > > >> > >>> > for
> > > >> > >>> > > efficient conversion
> > > >> > >>> > > to active tasks since we know that's what they will be.
> > > >> > >>> > >
> > > >> > >>> > > This resolves some of the awkwardness of dealing with the
> > > >> special
> > > >> > >>> case
> > > >> > >>> > > mentioned above: we
> > > >> > >>> > > find a balanced assignment of stateful and stateless tasks,
> > > and
> > > >> > >>> create
> > > >> > >>> > > restoring tasks as needed.
> > > >> > >>> > > If logging is disabled, no restoring task is created.
> > > >> > >>> > >
> > > >> > >>> > >
> > > >> > >>> > > On Thu, Aug 8, 2019 at 3:44 PM Guozhang Wang <
> > > >> wangguoz@gmail.com>
> > > >> > >>> wrote:
> > > >> > >>> > >
> > > >> > >>> > >> Regarding 3) above: I think for active task they should
> > still
> > > >> be
> > > >> > >>> > >> considered
> > > >> > >>> > >> stateful since the processor would still pay IO cost
> > > accessing
> > > >> the
> > > >> > >>> > store,
> > > >> > >>> > >> but they would not have standby tasks?
> > > >> > >>> > >>
> > > >> > >>> > >> On Thu, Aug 8, 2019 at 7:49 AM Bruno Cadonna <
> > > >> bruno@confluent.io>
> > > >> > >>> > wrote:
> > > >> > >>> > >>
> > > >> > >>> > >> > Hi,
> > > >> > >>> > >> >
> > > >> > >>> > >> > Thank you for the KIP!
> > > >> > >>> > >> >
> > > >> > >>> > >> > Some questions/comments:
> > > >> > >>> > >> >
> > > >> > >>> > >> > 1. I am wondering if the "stand-by" tasks that catch up
> > > state
> > > >> > >>> before
> > > >> > >>> > >> > the active task is switched deserve its own name in this
> > > KIP
> > > >> and
> > > >> > >>> maybe
> > > >> > >>> > >> > in the code. We have already stated that they are not
> > true
> > > >> > >>> stand-by
> > > >> > >>> > >> > tasks, they are not configured through
> > > >> `num.standby.replicas`,
> > > >> > and
> > > >> > >>> > >> > maybe they have also other properties that distinguish
> > them
> > > >> from
> > > >> > >>> true
> > > >> > >>> > >> > stand-by tasks of which we are not aware yet. For
> > example,
> > > >> they
> > > >> > >>> may be
> > > >> > >>> > >> > prioritized differently than other tasks. Furthermore,
> > the
> > > >> name
> > > >> > >>> > >> > "stand-by" does not really fit with the planned
> > > >> functionality of
> > > >> > >>> those
> > > >> > >>> > >> > tasks. In the following, I will call them false stand-by
> > > >> tasks.
> > > >> > >>> > >> >
> > > >> > >>> > >> > 2. Did you consider to trigger the probing rebalances not
> > > at
> > > >> > >>> regular
> > > >> > >>> > >> > time intervals but when the false stand-by tasks reach an
> > > >> > >>> acceptable
> > > >> > >>> > >> > lag? If you did consider, could you add a paragraph why
> > you
> > > >> > >>> rejected
> > > >> > >>> > >> > this idea to the "Rejected Alternatives" section.
> > > >> > >>> > >> >
> > > >> > >>> > >> > 3. Are tasks that solely contain stores with disabled
> > > logging
> > > >> > >>> > >> > classified as stateful or stateless in the algorithm? I
> > > would
> > > >> > >>> guess
> > > >> > >>> > >> > stateless, although if possible they should be assigned
> > to
> > > >> the
> > > >> > >>> same
> > > >> > >>> > >> > instance they had run before the rebalance. As far as I
> > can
> > > >> see
> > > >> > >>> this
> > > >> > >>> > >> > special case is not handled in the algorithm.
> > > >> > >>> > >> >
> > > >> > >>> > >> > Best,
> > > >> > >>> > >> > Bruno
> > > >> > >>> > >> >
> > > >> > >>> > >> >
> > > >> > >>> > >> >
> > > >> > >>> > >> > On Thu, Aug 8, 2019 at 8:24 AM Guozhang Wang <
> > > >> > wangguoz@gmail.com>
> > > >> > >>> > >> wrote:
> > > >> > >>> > >> > >
> > > >> > >>> > >> > > 1. Sounds good, just wanted to clarify; and it may
> > worth
> > > >> > >>> documenting
> > > >> > >>> > >> it
> > > >> > >>> > >> > so
> > > >> > >>> > >> > > that users would not be surprised when monitoring their
> > > >> > >>> footprint.
> > > >> > >>> > >> > >
> > > >> > >>> > >> > > 2. Hmm I see... I think the trade-off can be described
> > as
> > > >> "how
> > > >> > >>> much
> > > >> > >>> > >> > > imbalance would bother you to be willing to pay another
> > > >> > >>> rebalance,
> > > >> > >>> > >> along
> > > >> > >>> > >> > > with potentially more restoration lag", and the current
> > > >> > >>> definition
> > > >> > >>> > of
> > > >> > >>> > >> > > rebalance_factor can be considered as a rough
> > measurement
> > > >> of
> > > >> > >>> that
> > > >> > >>> > >> > > imbalance. Of course one can argue that a finer grained
> > > >> > >>> measurement
> > > >> > >>> > >> could
> > > >> > >>> > >> > > be "resource footprint" like CPU / storage of each
> > > instance
> > > >> > >>> like we
> > > >> > >>> > >> have
> > > >> > >>> > >> > in
> > > >> > >>> > >> > > Kafka broker auto balancing tools, but I'd prefer not
> > > doing
> > > >> > >>> that as
> > > >> > >>> > >> part
> > > >> > >>> > >> > of
> > > >> > >>> > >> > > the library but more as an operational tool in the
> > > future.
> > > >> On
> > > >> > >>> the
> > > >> > >>> > >> other
> > > >> > >>> > >> > > hand, I've seen stateful and stateless tasks having
> > very
> > > >> > >>> different
> > > >> > >>> > >> load,
> > > >> > >>> > >> > > and sometimes the only bottleneck of a Streams app is
> > > just
> > > >> one
> > > >> > >>> > >> stateful
> > > >> > >>> > >> > > sub-topology and whoever gets tasks of that
> > sub-topology
> > > >> > become
> > > >> > >>> > >> hotspot
> > > >> > >>> > >> > > (and that's why our algorithm tries to balance per
> > > >> > sub-topology
> > > >> > >>> as
> > > >> > >>> > >> well),
> > > >> > >>> > >> > > so maybe we can just consider stateful tasks when
> > > >> calculating
> > > >> > >>> this
> > > >> > >>> > >> factor
> > > >> > >>> > >> > > as a very brute force heuristic?
> > > >> > >>> > >> > >
> > > >> > >>> > >> > > 3.a. Thinking about this a bit more, maybe it's better
> > > not
> > > >> try
> > > >> > >>> to
> > > >> > >>> > >> tackle
> > > >> > >>> > >> > an
> > > >> > >>> > >> > > unseen enemy just yet, and observe if it really emerges
> > > >> later,
> > > >> > >>> and
> > > >> > >>> > by
> > > >> > >>> > >> > then
> > > >> > >>> > >> > > we may have other ways to not starving the standby
> > tasks,
> > > >> for
> > > >> > >>> > >> example, by
> > > >> > >>> > >> > > using dedicate threads for standby tasks or even
> > consider
> > > >> > having
> > > >> > >>> > >> higher
> > > >> > >>> > >> > > priority for standby than active so that we always try
> > to
> > > >> > >>> caught up
> > > >> > >>> > >> > standby
> > > >> > >>> > >> > > first, then process active; and if active's lagging
> > > >> compared
> > > >> > to
> > > >> > >>> > >> > > log-end-offset is increasing then we should increase
> > > >> capacity,
> > > >> > >>> etc
> > > >> > >>> > >> etc.
> > > >> > >>> > >> > >
> > > >> > >>> > >> > > 4. Actually with KIP-429 this may not be the case: we
> > may
> > > >> not
> > > >> > >>> call
> > > >> > >>> > >> > > onPartitionsRevoked prior to rebalance any more so
> > would
> > > >> not
> > > >> > >>> transit
> > > >> > >>> > >> > state
> > > >> > >>> > >> > > to PARTITIONS_REVOKED, and hence not cause the state of
> > > the
> > > >> > >>> instance
> > > >> > >>> > >> to
> > > >> > >>> > >> > be
> > > >> > >>> > >> > > REBALANCING. In other words, even if a instance is
> > > >> undergoing
> > > >> > a
> > > >> > >>> > >> rebalance
> > > >> > >>> > >> > > it's state may still be RUNNING and it may still be
> > > >> processing
> > > >> > >>> > >> records at
> > > >> > >>> > >> > > the same time.
> > > >> > >>> > >> > >
> > > >> > >>> > >> > >
> > > >> > >>> > >> > > On Wed, Aug 7, 2019 at 12:14 PM John Roesler <
> > > >> > john@confluent.io
> > > >> > >>> >
> > > >> > >>> > >> wrote:
> > > >> > >>> > >> > >
> > > >> > >>> > >> > > > Hey Guozhang,
> > > >> > >>> > >> > > >
> > > >> > >>> > >> > > > Thanks for the review!
> > > >> > >>> > >> > > >
> > > >> > >>> > >> > > > 1. Yes, even with `num.standby.replicas := 0`, we
> > will
> > > >> still
> > > >> > >>> > >> > temporarily
> > > >> > >>> > >> > > > allocate standby tasks to accomplish a no-downtime
> > task
> > > >> > >>> migration.
> > > >> > >>> > >> > > > Although, I'd argue that this doesn't really violate
> > > the
> > > >> > >>> config,
> > > >> > >>> > as
> > > >> > >>> > >> the
> > > >> > >>> > >> > > > task isn't a true hot standby. As soon as it catches
> > > up,
> > > >> > we'll
> > > >> > >>> > >> > rebalance
> > > >> > >>> > >> > > > again, that task will become active, and the original
> > > >> > instance
> > > >> > >>> > that
> > > >> > >>> > >> > hosted
> > > >> > >>> > >> > > > the active task will no longer have the task assigned
> > > at
> > > >> > all.
> > > >> > >>> Once
> > > >> > >>> > >> the
> > > >> > >>> > >> > > > stateDirCleaner kicks in, we'll free the disk space
> > > from
> > > >> it,
> > > >> > >>> and
> > > >> > >>> > >> > return to
> > > >> > >>> > >> > > > the steady-state of having just one copy of the task
> > in
> > > >> the
> > > >> > >>> > cluster.
> > > >> > >>> > >> > > >
> > > >> > >>> > >> > > > We can of course do without this, but I feel the
> > > current
> > > >> > >>> proposal
> > > >> > >>> > is
> > > >> > >>> > >> > > > operationally preferable, since it doesn't make
> > > >> configuring
> > > >> > >>> > >> > hot-standbys a
> > > >> > >>> > >> > > > pre-requisite for fast rebalances.
> > > >> > >>> > >> > > >
> > > >> > >>> > >> > > > 2. Yes, I think your interpretation is what we
> > > intended.
> > > >> The
> > > >> > >>> > default
> > > >> > >>> > >> > > > balance_factor would be 1, as it is implicitly today.
> > > >> What
> > > >> > >>> this
> > > >> > >>> > >> does is
> > > >> > >>> > >> > > > allows operators to trade off less balanced
> > assignments
> > > >> > >>> against
> > > >> > >>> > >> fewer
> > > >> > >>> > >> > > > rebalances. If you have lots of space capacity in
> > your
> > > >> > >>> instances,
> > > >> > >>> > >> this
> > > >> > >>> > >> > may
> > > >> > >>> > >> > > > be a perfectly fine tradeoff, and you may prefer for
> > > >> Streams
> > > >> > >>> not
> > > >> > >>> > to
> > > >> > >>> > >> > bother
> > > >> > >>> > >> > > > streaming GBs of data from the broker in pursuit of
> > > >> perfect
> > > >> > >>> > balance.
> > > >> > >>> > >> > Not
> > > >> > >>> > >> > > > married to this configuration, though. It was
> > inspired
> > > by
> > > >> > the
> > > >> > >>> > >> related
> > > >> > >>> > >> > work
> > > >> > >>> > >> > > > research we did.
> > > >> > >>> > >> > > >
> > > >> > >>> > >> > > > 3. I'll take a look
> > > >> > >>> > >> > > >
> > > >> > >>> > >> > > > 3a. I think this is a good idea. I'd classify it as a
> > > >> type
> > > >> > of
> > > >> > >>> grey
> > > >> > >>> > >> > failure
> > > >> > >>> > >> > > > detection. It may make more sense to tackle grey
> > > >> failures as
> > > >> > >>> part
> > > >> > >>> > of
> > > >> > >>> > >> > the
> > > >> > >>> > >> > > > heartbeat protocol (as I POCed here:
> > > >> > >>> > >> > > > https://github.com/apache/kafka/pull/7096/files).
> > > WDYT?
> > > >> > >>> > >> > > >
> > > >> > >>> > >> > > > 4. Good catch! I didn't think about that before.
> > > Looking
> > > >> at
> > > >> > it
> > > >> > >>> > now,
> > > >> > >>> > >> > though,
> > > >> > >>> > >> > > > I wonder if we're actually protected already. The
> > > >> > >>> stateDirCleaner
> > > >> > >>> > >> > thread
> > > >> > >>> > >> > > > only executes if the instance is in RUNNING state,
> > and
> > > >> > KIP-441
> > > >> > >>> > >> > proposes to
> > > >> > >>> > >> > > > use "probing rebalances" to report task lag. Hence,
> > > >> during
> > > >> > the
> > > >> > >>> > >> window
> > > >> > >>> > >> > > > between when the instance reports a lag and the
> > > assignor
> > > >> > >>> makes a
> > > >> > >>> > >> > decision
> > > >> > >>> > >> > > > about it, the instance should remain in REBALANCING
> > > >> state,
> > > >> > >>> right?
> > > >> > >>> > If
> > > >> > >>> > >> > so,
> > > >> > >>> > >> > > > then this should prevent the race condition. If not,
> > > >> then we
> > > >> > >>> do
> > > >> > >>> > >> indeed
> > > >> > >>> > >> > need
> > > >> > >>> > >> > > > to do something about it.
> > > >> > >>> > >> > > >
> > > >> > >>> > >> > > > 5. Good idea. I think that today, you can only see
> > the
> > > >> > >>> consumer
> > > >> > >>> > lag,
> > > >> > >>> > >> > which
> > > >> > >>> > >> > > > is a poor substitute. I'll add some metrics to the
> > > >> proposal.
> > > >> > >>> > >> > > >
> > > >> > >>> > >> > > > Thanks again for the comments!
> > > >> > >>> > >> > > > -John
> > > >> > >>> > >> > > >
> > > >> > >>> > >> > > > On Tue, Aug 6, 2019 at 4:27 PM Guozhang Wang <
> > > >> > >>> wangguoz@gmail.com>
> > > >> > >>> > >> > wrote:
> > > >> > >>> > >> > > >
> > > >> > >>> > >> > > > > Hello Sophie,
> > > >> > >>> > >> > > > >
> > > >> > >>> > >> > > > > Thanks for the proposed KIP. I left some comments
> > on
> > > >> the
> > > >> > >>> wiki
> > > >> > >>> > >> itself,
> > > >> > >>> > >> > > > and I
> > > >> > >>> > >> > > > > think I'm still not very clear on a couple or
> > those:
> > > >> > >>> > >> > > > >
> > > >> > >>> > >> > > > > 1. With this proposal, does that mean with
> > > >> > >>> num.standby.replicas
> > > >> > >>> > ==
> > > >> > >>> > >> > 0, we
> > > >> > >>> > >> > > > > may sometimes still have some standby tasks which
> > may
> > > >> > >>> violate
> > > >> > >>> > the
> > > >> > >>> > >> > config?
> > > >> > >>> > >> > > > >
> > > >> > >>> > >> > > > > 2. I think I understand the rationale to consider
> > > lags
> > > >> > that
> > > >> > >>> is
> > > >> > >>> > >> below
> > > >> > >>> > >> > the
> > > >> > >>> > >> > > > > specified threshold to be equal, rather than still
> > > >> > >>> considering
> > > >> > >>> > >> 5000
> > > >> > >>> > >> > is
> > > >> > >>> > >> > > > > better than 5001 -- we do not want to
> > "over-optimize"
> > > >> and
> > > >> > >>> > >> potentially
> > > >> > >>> > >> > > > falls
> > > >> > >>> > >> > > > > into endless rebalances back and forth.
> > > >> > >>> > >> > > > >
> > > >> > >>> > >> > > > > But I'm not clear about the rationale of the second
> > > >> > >>> parameter of
> > > >> > >>> > >> > > > >
> > > >> > >>> constrainedBalancedAssignment(StatefulTasksToRankedCandidates,
> > > >> > >>> > >> > > > > balance_factor):
> > > >> > >>> > >> > > > >
> > > >> > >>> > >> > > > > Does that mean, e.g. with balance_factor of 3, we'd
> > > >> > >>> consider two
> > > >> > >>> > >> > > > > assignments one resulting balance_factor 0 and one
> > > >> > resulting
> > > >> > >>> > >> > > > balance_factor
> > > >> > >>> > >> > > > > 3 to be equally optimized assignment and therefore
> > > may
> > > >> > "stop
> > > >> > >>> > >> early"?
> > > >> > >>> > >> > This
> > > >> > >>> > >> > > > > was not very convincing to me :P
> > > >> > >>> > >> > > > >
> > > >> > >>> > >> > > > > 3. There are a couple of minor comments about the
> > > >> > algorithm
> > > >> > >>> > >> itself,
> > > >> > >>> > >> > left
> > > >> > >>> > >> > > > on
> > > >> > >>> > >> > > > > the wiki page since it needs to refer to the exact
> > > line
> > > >> > and
> > > >> > >>> > better
> > > >> > >>> > >> > > > > displayed there.
> > > >> > >>> > >> > > > >
> > > >> > >>> > >> > > > > 3.a Another wild thought about the threshold
> > itself:
> > > >> today
> > > >> > >>> the
> > > >> > >>> > >> > assignment
> > > >> > >>> > >> > > > > itself is memoryless, so we would not know if the
> > > >> reported
> > > >> > >>> > >> `TaskLag`
> > > >> > >>> > >> > > > itself
> > > >> > >>> > >> > > > > is increasing or decreasing even if the current
> > value
> > > >> is
> > > >> > >>> under
> > > >> > >>> > the
> > > >> > >>> > >> > > > > threshold. I wonder if it worthy to make it a bit
> > > more
> > > >> > >>> > >> complicated to
> > > >> > >>> > >> > > > track
> > > >> > >>> > >> > > > > task lag trend at the assignor? Practically it may
> > > not
> > > >> be
> > > >> > >>> very
> > > >> > >>> > >> > uncommon
> > > >> > >>> > >> > > > > that stand-by tasks are not keeping up due to the
> > > fact
> > > >> > that
> > > >> > >>> > other
> > > >> > >>> > >> > active
> > > >> > >>> > >> > > > > tasks hosted on the same thread is starving the
> > > standby
> > > >> > >>> tasks.
> > > >> > >>> > >> > > > >
> > > >> > >>> > >> > > > > 4. There's a potential race condition risk when
> > > >> reporting
> > > >> > >>> > >> `TaskLags`
> > > >> > >>> > >> > in
> > > >> > >>> > >> > > > the
> > > >> > >>> > >> > > > > subscription: right after reporting it to the
> > leader,
> > > >> the
> > > >> > >>> > cleanup
> > > >> > >>> > >> > thread
> > > >> > >>> > >> > > > > kicks in and deletes the state directory. If the
> > task
> > > >> was
> > > >> > >>> > assigned
> > > >> > >>> > >> > to the
> > > >> > >>> > >> > > > > host it would cause it to restore from beginning
> > and
> > > >> > >>> effectively
> > > >> > >>> > >> > make the
> > > >> > >>> > >> > > > > seemingly optimized assignment very sub-optimal.
> > > >> > >>> > >> > > > >
> > > >> > >>> > >> > > > > To be on the safer side we should consider either
> > > prune
> > > >> > out
> > > >> > >>> > those
> > > >> > >>> > >> > tasks
> > > >> > >>> > >> > > > > that are "close to be cleaned up" in the
> > > subscription,
> > > >> or
> > > >> > we
> > > >> > >>> > >> should
> > > >> > >>> > >> > delay
> > > >> > >>> > >> > > > > the cleanup right after we've included them in the
> > > >> > >>> subscription
> > > >> > >>> > in
> > > >> > >>> > >> > case
> > > >> > >>> > >> > > > > they are been selected as assigned tasks by the
> > > >> assignor.
> > > >> > >>> > >> > > > >
> > > >> > >>> > >> > > > > 5. This is a meta comment: I think it would be
> > > helpful
> > > >> to
> > > >> > >>> add
> > > >> > >>> > some
> > > >> > >>> > >> > user
> > > >> > >>> > >> > > > > visibility on the standby tasks lagging as well,
> > via
> > > >> > >>> metrics for
> > > >> > >>> > >> > example.
> > > >> > >>> > >> > > > > Today it is hard for us to observe how far are our
> > > >> current
> > > >> > >>> > standby
> > > >> > >>> > >> > tasks
> > > >> > >>> > >> > > > > compared to the active tasks and whether that lag
> > is
> > > >> being
> > > >> > >>> > >> > increasing or
> > > >> > >>> > >> > > > > decreasing. As a follow-up task, for example, the
> > > >> > rebalance
> > > >> > >>> > should
> > > >> > >>> > >> > also
> > > >> > >>> > >> > > > be
> > > >> > >>> > >> > > > > triggered if we realize that some standby task's
> > lag
> > > is
> > > >> > >>> > increasing
> > > >> > >>> > >> > > > > indefinitely means that it cannot keep up (which is
> > > >> > another
> > > >> > >>> > >> indicator
> > > >> > >>> > >> > > > > either you need to add more resources with the
> > > >> > num.standbys
> > > >> > >>> or
> > > >> > >>> > >> your
> > > >> > >>> > >> > are
> > > >> > >>> > >> > > > > still not balanced enough).
> > > >> > >>> > >> > > > >
> > > >> > >>> > >> > > > >
> > > >> > >>> > >> > > > > On Tue, Aug 6, 2019 at 1:32 PM Sophie Blee-Goldman
> > <
> > > >> > >>> > >> > sophie@confluent.io>
> > > >> > >>> > >> > > > > wrote:
> > > >> > >>> > >> > > > >
> > > >> > >>> > >> > > > > > Hey all,
> > > >> > >>> > >> > > > > >
> > > >> > >>> > >> > > > > > I'd like to kick off discussion on KIP-441, aimed
> > > at
> > > >> the
> > > >> > >>> long
> > > >> > >>> > >> > restore
> > > >> > >>> > >> > > > > times
> > > >> > >>> > >> > > > > > in Streams during which further active processing
> > > >> and IQ
> > > >> > >>> are
> > > >> > >>> > >> > blocked.
> > > >> > >>> > >> > > > > > Please give it a read and let us know your
> > thoughts
> > > >> > >>> > >> > > > > >
> > > >> > >>> > >> > > > > >
> > > >> > >>> > >> > > > > >
> > > >> > >>> > >> > > > >
> > > >> > >>> > >> > > >
> > > >> > >>> > >> >
> > > >> > >>> > >>
> > > >> > >>> >
> > > >> > >>>
> > > >> >
> > > >>
> > >
> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-441:+Smooth+Scaling+Out+for+Kafka+Streams
> > > >> > >>> > >> > > > > >
> > > >> > >>> > >> > > > > > Cheers,
> > > >> > >>> > >> > > > > > Sophie
> > > >> > >>> > >> > > > > >
> > > >> > >>> > >> > > > >
> > > >> > >>> > >> > > > >
> > > >> > >>> > >> > > > > --
> > > >> > >>> > >> > > > > -- Guozhang
> > > >> > >>> > >> > > > >
> > > >> > >>> > >> > > >
> > > >> > >>> > >> > >
> > > >> > >>> > >> > >
> > > >> > >>> > >> > > --
> > > >> > >>> > >> > > -- Guozhang
> > > >> > >>> > >> >
> > > >> > >>> > >>
> > > >> > >>> > >>
> > > >> > >>> > >> --
> > > >> > >>> > >> -- Guozhang
> > > >> > >>> > >>
> > > >> > >>> > >
> > > >> > >>> >
> > > >> > >>>
> > > >> > >>>
> > > >> > >>> --
> > > >> > >>> -- Guozhang
> > > >> > >>>
> > > >> > >>
> > > >> >
> > > >>
> > > >>
> > > >> --
> > > >> -- Guozhang
> > > >>
> > > >
> > >
> >
> >
> > --
> > -- Guozhang
> >


Mime
View raw message