kafka-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From John Roesler <j...@confluent.io>
Subject Re: [DISCUSS] KIP-441: Smooth Scaling Out for Kafka Streams
Date Wed, 21 Aug 2019 15:43:16 GMT
I have also specifically called out that the assignment must achieve both
"instance" and "task" balance:
https://cwiki.apache.org/confluence/display/KAFKA/KIP-441%3A+Smooth+Scaling+Out+for+Kafka+Streams#KIP-441:SmoothScalingOutforKafkaStreams-Defining%22balance%22

I've also addressed the problem of state stores with logging disabled:
https://cwiki.apache.org/confluence/display/KAFKA/KIP-441%3A+Smooth+Scaling+Out+for+Kafka+Streams#KIP-441:SmoothScalingOutforKafkaStreams-Statewithloggingdisabled

I believe this addresses all the concerns that have been raised to date.
Apologies if I've overlooked one of your concerns.

Please give the KIP another read and let me know of any further thoughts!
Hopefully, we can start the voting on this KIP by the end of the week.

Thanks,
-John

On Tue, Aug 20, 2019 at 5:16 PM John Roesler <john@confluent.io> wrote:

> In response to Bruno's concern #2, I've also added that section to the
> "Rejected Alternatives" section.
>
> Additionally, after reviewing some other assignment papers, I've developed
> the concern that specifying which "phases" the assignment algorithm should
> have, or indeed the logic of it at all, might be a mistake that
> over-constrains our ability to write an optimal algorithm. Therefore, I've
> also refactored the KIP to just describe the protocol, and specify the
> requirements for the assignment algorithm, but not its exact behavior at
> all.
>
> Thanks,
> -John
>
> On Tue, Aug 20, 2019 at 5:13 PM John Roesler <john@confluent.io> wrote:
>
>> Hi All,
>>
>> Thanks for the discussion. I've been considering the idea of giving the
>> "catching up" tasks a different name/role. I was in favor initially, but
>> after working though some details, I think it causes some problems, which
>> I've written up in the "rejected alternatives" part of the KIP:
>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-441%3A+Smooth+Scaling+Out+for+Kafka+Streams#KIP-441:SmoothScalingOutforKafkaStreams-Addinganewkindoftask(%22moving%22,%22recovering%22,%22learner%22)fortaskmovements
>>
>> Please give it a read and let me know what you think.
>>
>> Thanks,
>> -John
>>
>> On Thu, Aug 8, 2019 at 7:57 PM Guozhang Wang <wangguoz@gmail.com> wrote:
>>
>>> I think I agree with you Sophie. My gut feeling is that 1) it should not
>>> be
>>> the major concern in assignor's algorithm for standby tasks not catching
>>> up, but rather be tackled in different modules, and 2) a lot of
>>> optimization can be down at the stream thread itself, like dedicated
>>> threading and larger batching, or even complicated scheduling mechanisms
>>> between running, restoring and standby tasks. In anyways, I think we can
>>> take this out of the scope of KIP-441 for now.
>>>
>>>
>>> Guozhang
>>>
>>>
>>> On Thu, Aug 8, 2019 at 5:48 PM Sophie Blee-Goldman <sophie@confluent.io>
>>> wrote:
>>>
>>> > > we may have other ways to not starving the standby tasks, for
>>> example, by
>>> > > using dedicate threads for standby tasks or even consider having
>>> > *higher> priority for standby than active* so that we always try to
>>> caught
>>> > up standby
>>> > > first, then process active
>>> >
>>> > This is an interesting idea, but seems likely to get in the way of the
>>> > original idea of this KIP
>>> > -- if we always process standby tasks first, then if we are assigned a
>>> new
>>> > standby task we
>>> > will have to wait for it to catch up completely before processing any
>>> > active tasks! That's
>>> > even worse than the situation this KIP is trying to help with, since a
>>> new
>>> > standby task has to
>>> > restore from 0 (whereas an active task at least can take over from
>>> wherever
>>> > the standby was).
>>> >
>>> > During restoration -- while there exist any restoring tasks -- I think
>>> it's
>>> > reasonable to de-prioritize the
>>> > standby tasks and just process restoring and active tasks so both can
>>> make
>>> > progress. But we should
>>> > let them catch up afterwards somehow -- maybe we can apply some kind of
>>> > heuristic, like "if we haven't
>>> > processed standbys for X iterations, or Y milliseconds, do so now."
>>> >
>>> > Actually, it might even be beneficial to avoid processing standbys a
>>> record
>>> > or two at a time and instead
>>> > wait for a large enough batch to build up for the RocksDB bulk-loading
>>> > benefits.
>>> >
>>> > I think the "use dedicated threads for standby" is the more promising
>>> end
>>> > goal, especially since
>>> > if we split restoration into "restoring tasks" then active and standbys
>>> > share almost nothing. But
>>> > that seems like follow-up work to the current KIP :)
>>> >
>>> > On Thu, Aug 8, 2019 at 5:31 PM Sophie Blee-Goldman <
>>> sophie@confluent.io>
>>> > wrote:
>>> >
>>> > > Stateful tasks with logging disabled seem to be an interesting edge
>>> case.
>>> > > On the one hand,
>>> > > for balancing purposes they should be considered stateful since as
>>> > > Guozhang pointed out
>>> > > they are still "heavy" in IO costs. But for "catching up" purposes,
>>> ie
>>> > > when allocating standby
>>> > > tasks that will become active tasks, they should be considered
>>> stateless
>>> > > as there is so
>>> > > meaningful sense of their lag. We should never allocate standby
>>> tasks for
>>> > > them during the
>>> > > first rebalance, but should ensure they are evenly distributed across
>>> > > instances. Maybe we
>>> > > should split these into a third category -- after we assign all
>>> stateful
>>> > > tasks with logging, we
>>> > > then distribute the set of logging-disabled stateful tasks to improve
>>> > > balance, before lastly
>>> > > distributing stateless tasks?
>>> > >
>>> > > This actually leads into what I was just thinking, which is that we
>>> > really
>>> > > should distinguish the
>>> > > "catch-up" standbys from normal standbys as well as distinguishing
>>> > > actively processing tasks
>>> > > from active tasks that are still in the restore phase. It's somewhat
>>> > > awkward that today, some
>>> > > active tasks just start processing immediately while others behave
>>> more
>>> > > like standby than active
>>> > > tasks for some time, before switching to real active. They first use
>>> the
>>> > > restoreConsumer, then
>>> > > later only the "normal" consumer.
>>> > >
>>> > > However, this restore period is still distinct from normal standbys
>>> in a
>>> > > lot of ways -- the code path
>>> > > for restoring is different than for updating standbys, for example
>>> in how
>>> > > long we block on #poll.
>>> > > So in addition to giving them their own name -- let's go with
>>> restoring
>>> > > task for now -- they really
>>> > > do seem to deserve being their own distinct task. We can optimize
>>> them
>>> > for
>>> > > efficient conversion
>>> > > to active tasks since we know that's what they will be.
>>> > >
>>> > > This resolves some of the awkwardness of dealing with the special
>>> case
>>> > > mentioned above: we
>>> > > find a balanced assignment of stateful and stateless tasks, and
>>> create
>>> > > restoring tasks as needed.
>>> > > If logging is disabled, no restoring task is created.
>>> > >
>>> > >
>>> > > On Thu, Aug 8, 2019 at 3:44 PM Guozhang Wang <wangguoz@gmail.com>
>>> wrote:
>>> > >
>>> > >> Regarding 3) above: I think for active task they should still be
>>> > >> considered
>>> > >> stateful since the processor would still pay IO cost accessing
the
>>> > store,
>>> > >> but they would not have standby tasks?
>>> > >>
>>> > >> On Thu, Aug 8, 2019 at 7:49 AM Bruno Cadonna <bruno@confluent.io>
>>> > wrote:
>>> > >>
>>> > >> > Hi,
>>> > >> >
>>> > >> > Thank you for the KIP!
>>> > >> >
>>> > >> > Some questions/comments:
>>> > >> >
>>> > >> > 1. I am wondering if the "stand-by" tasks that catch up state
>>> before
>>> > >> > the active task is switched deserve its own name in this KIP
and
>>> maybe
>>> > >> > in the code. We have already stated that they are not true
>>> stand-by
>>> > >> > tasks, they are not configured through `num.standby.replicas`,
and
>>> > >> > maybe they have also other properties that distinguish them
from
>>> true
>>> > >> > stand-by tasks of which we are not aware yet. For example,
they
>>> may be
>>> > >> > prioritized differently than other tasks. Furthermore, the
name
>>> > >> > "stand-by" does not really fit with the planned functionality
of
>>> those
>>> > >> > tasks. In the following, I will call them false stand-by tasks.
>>> > >> >
>>> > >> > 2. Did you consider to trigger the probing rebalances not
at
>>> regular
>>> > >> > time intervals but when the false stand-by tasks reach an
>>> acceptable
>>> > >> > lag? If you did consider, could you add a paragraph why you
>>> rejected
>>> > >> > this idea to the "Rejected Alternatives" section.
>>> > >> >
>>> > >> > 3. Are tasks that solely contain stores with disabled logging
>>> > >> > classified as stateful or stateless in the algorithm? I would
>>> guess
>>> > >> > stateless, although if possible they should be assigned to
the
>>> same
>>> > >> > instance they had run before the rebalance. As far as I can
see
>>> this
>>> > >> > special case is not handled in the algorithm.
>>> > >> >
>>> > >> > Best,
>>> > >> > Bruno
>>> > >> >
>>> > >> >
>>> > >> >
>>> > >> > On Thu, Aug 8, 2019 at 8:24 AM Guozhang Wang <wangguoz@gmail.com>
>>> > >> wrote:
>>> > >> > >
>>> > >> > > 1. Sounds good, just wanted to clarify; and it may worth
>>> documenting
>>> > >> it
>>> > >> > so
>>> > >> > > that users would not be surprised when monitoring their
>>> footprint.
>>> > >> > >
>>> > >> > > 2. Hmm I see... I think the trade-off can be described
as "how
>>> much
>>> > >> > > imbalance would bother you to be willing to pay another
>>> rebalance,
>>> > >> along
>>> > >> > > with potentially more restoration lag", and the current
>>> definition
>>> > of
>>> > >> > > rebalance_factor can be considered as a rough measurement
of
>>> that
>>> > >> > > imbalance. Of course one can argue that a finer grained
>>> measurement
>>> > >> could
>>> > >> > > be "resource footprint" like CPU / storage of each instance
>>> like we
>>> > >> have
>>> > >> > in
>>> > >> > > Kafka broker auto balancing tools, but I'd prefer not
doing
>>> that as
>>> > >> part
>>> > >> > of
>>> > >> > > the library but more as an operational tool in the future.
On
>>> the
>>> > >> other
>>> > >> > > hand, I've seen stateful and stateless tasks having very
>>> different
>>> > >> load,
>>> > >> > > and sometimes the only bottleneck of a Streams app is
just one
>>> > >> stateful
>>> > >> > > sub-topology and whoever gets tasks of that sub-topology
become
>>> > >> hotspot
>>> > >> > > (and that's why our algorithm tries to balance per sub-topology
>>> as
>>> > >> well),
>>> > >> > > so maybe we can just consider stateful tasks when calculating
>>> this
>>> > >> factor
>>> > >> > > as a very brute force heuristic?
>>> > >> > >
>>> > >> > > 3.a. Thinking about this a bit more, maybe it's better
not try
>>> to
>>> > >> tackle
>>> > >> > an
>>> > >> > > unseen enemy just yet, and observe if it really emerges
later,
>>> and
>>> > by
>>> > >> > then
>>> > >> > > we may have other ways to not starving the standby tasks,
for
>>> > >> example, by
>>> > >> > > using dedicate threads for standby tasks or even consider
having
>>> > >> higher
>>> > >> > > priority for standby than active so that we always try
to
>>> caught up
>>> > >> > standby
>>> > >> > > first, then process active; and if active's lagging compared
to
>>> > >> > > log-end-offset is increasing then we should increase
capacity,
>>> etc
>>> > >> etc.
>>> > >> > >
>>> > >> > > 4. Actually with KIP-429 this may not be the case: we
may not
>>> call
>>> > >> > > onPartitionsRevoked prior to rebalance any more so would
not
>>> transit
>>> > >> > state
>>> > >> > > to PARTITIONS_REVOKED, and hence not cause the state
of the
>>> instance
>>> > >> to
>>> > >> > be
>>> > >> > > REBALANCING. In other words, even if a instance is undergoing
a
>>> > >> rebalance
>>> > >> > > it's state may still be RUNNING and it may still be processing
>>> > >> records at
>>> > >> > > the same time.
>>> > >> > >
>>> > >> > >
>>> > >> > > On Wed, Aug 7, 2019 at 12:14 PM John Roesler <john@confluent.io
>>> >
>>> > >> wrote:
>>> > >> > >
>>> > >> > > > Hey Guozhang,
>>> > >> > > >
>>> > >> > > > Thanks for the review!
>>> > >> > > >
>>> > >> > > > 1. Yes, even with `num.standby.replicas := 0`, we
will still
>>> > >> > temporarily
>>> > >> > > > allocate standby tasks to accomplish a no-downtime
task
>>> migration.
>>> > >> > > > Although, I'd argue that this doesn't really violate
the
>>> config,
>>> > as
>>> > >> the
>>> > >> > > > task isn't a true hot standby. As soon as it catches
up, we'll
>>> > >> > rebalance
>>> > >> > > > again, that task will become active, and the original
instance
>>> > that
>>> > >> > hosted
>>> > >> > > > the active task will no longer have the task assigned
at all.
>>> Once
>>> > >> the
>>> > >> > > > stateDirCleaner kicks in, we'll free the disk space
from it,
>>> and
>>> > >> > return to
>>> > >> > > > the steady-state of having just one copy of the
task in the
>>> > cluster.
>>> > >> > > >
>>> > >> > > > We can of course do without this, but I feel the
current
>>> proposal
>>> > is
>>> > >> > > > operationally preferable, since it doesn't make
configuring
>>> > >> > hot-standbys a
>>> > >> > > > pre-requisite for fast rebalances.
>>> > >> > > >
>>> > >> > > > 2. Yes, I think your interpretation is what we intended.
The
>>> > default
>>> > >> > > > balance_factor would be 1, as it is implicitly today.
What
>>> this
>>> > >> does is
>>> > >> > > > allows operators to trade off less balanced assignments
>>> against
>>> > >> fewer
>>> > >> > > > rebalances. If you have lots of space capacity in
your
>>> instances,
>>> > >> this
>>> > >> > may
>>> > >> > > > be a perfectly fine tradeoff, and you may prefer
for Streams
>>> not
>>> > to
>>> > >> > bother
>>> > >> > > > streaming GBs of data from the broker in pursuit
of perfect
>>> > balance.
>>> > >> > Not
>>> > >> > > > married to this configuration, though. It was inspired
by the
>>> > >> related
>>> > >> > work
>>> > >> > > > research we did.
>>> > >> > > >
>>> > >> > > > 3. I'll take a look
>>> > >> > > >
>>> > >> > > > 3a. I think this is a good idea. I'd classify it
as a type of
>>> grey
>>> > >> > failure
>>> > >> > > > detection. It may make more sense to tackle grey
failures as
>>> part
>>> > of
>>> > >> > the
>>> > >> > > > heartbeat protocol (as I POCed here:
>>> > >> > > > https://github.com/apache/kafka/pull/7096/files).
WDYT?
>>> > >> > > >
>>> > >> > > > 4. Good catch! I didn't think about that before.
Looking at it
>>> > now,
>>> > >> > though,
>>> > >> > > > I wonder if we're actually protected already. The
>>> stateDirCleaner
>>> > >> > thread
>>> > >> > > > only executes if the instance is in RUNNING state,
and KIP-441
>>> > >> > proposes to
>>> > >> > > > use "probing rebalances" to report task lag. Hence,
during the
>>> > >> window
>>> > >> > > > between when the instance reports a lag and the
assignor
>>> makes a
>>> > >> > decision
>>> > >> > > > about it, the instance should remain in REBALANCING
state,
>>> right?
>>> > If
>>> > >> > so,
>>> > >> > > > then this should prevent the race condition. If
not, then we
>>> do
>>> > >> indeed
>>> > >> > need
>>> > >> > > > to do something about it.
>>> > >> > > >
>>> > >> > > > 5. Good idea. I think that today, you can only see
the
>>> consumer
>>> > lag,
>>> > >> > which
>>> > >> > > > is a poor substitute. I'll add some metrics to the
proposal.
>>> > >> > > >
>>> > >> > > > Thanks again for the comments!
>>> > >> > > > -John
>>> > >> > > >
>>> > >> > > > On Tue, Aug 6, 2019 at 4:27 PM Guozhang Wang <
>>> wangguoz@gmail.com>
>>> > >> > wrote:
>>> > >> > > >
>>> > >> > > > > Hello Sophie,
>>> > >> > > > >
>>> > >> > > > > Thanks for the proposed KIP. I left some comments
on the
>>> wiki
>>> > >> itself,
>>> > >> > > > and I
>>> > >> > > > > think I'm still not very clear on a couple
or those:
>>> > >> > > > >
>>> > >> > > > > 1. With this proposal, does that mean with
>>> num.standby.replicas
>>> > ==
>>> > >> > 0, we
>>> > >> > > > > may sometimes still have some standby tasks
which may
>>> violate
>>> > the
>>> > >> > config?
>>> > >> > > > >
>>> > >> > > > > 2. I think I understand the rationale to consider
lags that
>>> is
>>> > >> below
>>> > >> > the
>>> > >> > > > > specified threshold to be equal, rather than
still
>>> considering
>>> > >> 5000
>>> > >> > is
>>> > >> > > > > better than 5001 -- we do not want to "over-optimize"
and
>>> > >> potentially
>>> > >> > > > falls
>>> > >> > > > > into endless rebalances back and forth.
>>> > >> > > > >
>>> > >> > > > > But I'm not clear about the rationale of the
second
>>> parameter of
>>> > >> > > > >
>>> constrainedBalancedAssignment(StatefulTasksToRankedCandidates,
>>> > >> > > > > balance_factor):
>>> > >> > > > >
>>> > >> > > > > Does that mean, e.g. with balance_factor of
3, we'd
>>> consider two
>>> > >> > > > > assignments one resulting balance_factor 0
and one resulting
>>> > >> > > > balance_factor
>>> > >> > > > > 3 to be equally optimized assignment and therefore
may "stop
>>> > >> early"?
>>> > >> > This
>>> > >> > > > > was not very convincing to me :P
>>> > >> > > > >
>>> > >> > > > > 3. There are a couple of minor comments about
the algorithm
>>> > >> itself,
>>> > >> > left
>>> > >> > > > on
>>> > >> > > > > the wiki page since it needs to refer to the
exact line and
>>> > better
>>> > >> > > > > displayed there.
>>> > >> > > > >
>>> > >> > > > > 3.a Another wild thought about the threshold
itself: today
>>> the
>>> > >> > assignment
>>> > >> > > > > itself is memoryless, so we would not know
if the reported
>>> > >> `TaskLag`
>>> > >> > > > itself
>>> > >> > > > > is increasing or decreasing even if the current
value is
>>> under
>>> > the
>>> > >> > > > > threshold. I wonder if it worthy to make it
a bit more
>>> > >> complicated to
>>> > >> > > > track
>>> > >> > > > > task lag trend at the assignor? Practically
it may not be
>>> very
>>> > >> > uncommon
>>> > >> > > > > that stand-by tasks are not keeping up due
to the fact that
>>> > other
>>> > >> > active
>>> > >> > > > > tasks hosted on the same thread is starving
the standby
>>> tasks.
>>> > >> > > > >
>>> > >> > > > > 4. There's a potential race condition risk
when reporting
>>> > >> `TaskLags`
>>> > >> > in
>>> > >> > > > the
>>> > >> > > > > subscription: right after reporting it to the
leader, the
>>> > cleanup
>>> > >> > thread
>>> > >> > > > > kicks in and deletes the state directory. If
the task was
>>> > assigned
>>> > >> > to the
>>> > >> > > > > host it would cause it to restore from beginning
and
>>> effectively
>>> > >> > make the
>>> > >> > > > > seemingly optimized assignment very sub-optimal.
>>> > >> > > > >
>>> > >> > > > > To be on the safer side we should consider
either prune out
>>> > those
>>> > >> > tasks
>>> > >> > > > > that are "close to be cleaned up" in the subscription,
or we
>>> > >> should
>>> > >> > delay
>>> > >> > > > > the cleanup right after we've included them
in the
>>> subscription
>>> > in
>>> > >> > case
>>> > >> > > > > they are been selected as assigned tasks by
the assignor.
>>> > >> > > > >
>>> > >> > > > > 5. This is a meta comment: I think it would
be helpful to
>>> add
>>> > some
>>> > >> > user
>>> > >> > > > > visibility on the standby tasks lagging as
well, via
>>> metrics for
>>> > >> > example.
>>> > >> > > > > Today it is hard for us to observe how far
are our current
>>> > standby
>>> > >> > tasks
>>> > >> > > > > compared to the active tasks and whether that
lag is being
>>> > >> > increasing or
>>> > >> > > > > decreasing. As a follow-up task, for example,
the rebalance
>>> > should
>>> > >> > also
>>> > >> > > > be
>>> > >> > > > > triggered if we realize that some standby task's
lag is
>>> > increasing
>>> > >> > > > > indefinitely means that it cannot keep up (which
is another
>>> > >> indicator
>>> > >> > > > > either you need to add more resources with
the num.standbys
>>> or
>>> > >> your
>>> > >> > are
>>> > >> > > > > still not balanced enough).
>>> > >> > > > >
>>> > >> > > > >
>>> > >> > > > > On Tue, Aug 6, 2019 at 1:32 PM Sophie Blee-Goldman
<
>>> > >> > sophie@confluent.io>
>>> > >> > > > > wrote:
>>> > >> > > > >
>>> > >> > > > > > Hey all,
>>> > >> > > > > >
>>> > >> > > > > > I'd like to kick off discussion on KIP-441,
aimed at the
>>> long
>>> > >> > restore
>>> > >> > > > > times
>>> > >> > > > > > in Streams during which further active
processing and IQ
>>> are
>>> > >> > blocked.
>>> > >> > > > > > Please give it a read and let us know
your thoughts
>>> > >> > > > > >
>>> > >> > > > > >
>>> > >> > > > > >
>>> > >> > > > >
>>> > >> > > >
>>> > >> >
>>> > >>
>>> >
>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-441:+Smooth+Scaling+Out+for+Kafka+Streams
>>> > >> > > > > >
>>> > >> > > > > > Cheers,
>>> > >> > > > > > Sophie
>>> > >> > > > > >
>>> > >> > > > >
>>> > >> > > > >
>>> > >> > > > > --
>>> > >> > > > > -- Guozhang
>>> > >> > > > >
>>> > >> > > >
>>> > >> > >
>>> > >> > >
>>> > >> > > --
>>> > >> > > -- Guozhang
>>> > >> >
>>> > >>
>>> > >>
>>> > >> --
>>> > >> -- Guozhang
>>> > >>
>>> > >
>>> >
>>>
>>>
>>> --
>>> -- Guozhang
>>>
>>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message