kafka-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Bruno Cadonna <br...@confluent.io>
Subject Re: [DISCUSS] KIP-441: Smooth Scaling Out for Kafka Streams
Date Wed, 04 Sep 2019 19:10:07 GMT
Hi John,

Thank you for your answer. Your assumptions sound reasonable to me.

Best,
Bruno

On Wed, Sep 4, 2019 at 5:30 PM John Roesler <john@confluent.io> wrote:
>
> Hey Bruno,
>
> Thanks for taking another look. Some quick responses:
>
> 1) It just means the number of offsets in the topic. E.g., the LSO is 100,
> but the first offset is 40 due to retention, so there are 60 offsets in the
> topic. Further, the lag on that topic would be considered to be 60 for any
> task that hadn't previously done any work on it.
>
> 2) This is undecidable in general. I.e., there's no way we can know whether
> the store is remote or not, and hence whether we can freely assign it to
> another instance, or whether we have to keep it on the same instance.
> However, there are a couple of reasons to go ahead and assume we have the
> freedom to move such tasks.
> * We know that nothing can prevent the loss of an instance in a cluster
> (I.e., this is true of all cloud environments, as well as any managed
> virtualized cluster like mesos or kubernetes), so any Streams program that
> makes use of non-remote, non-logged state is doomed to lose its state when
> it loses an instance.
> * If we take on a restriction that we cannot move such state between
> instances, we'd become overconstrained very quickly. Effectively, if you
> made use of non-logged stores, and we didn't assume freedom of movement,
> then we couldn't make use of any new instances in your cluster.
> * On the other hand, if we optimistically assume we can't move state, but
> only reassign it when we lose an instance, then we're supporting
> non-deterministic logic, because the program would produce different
> results, depending on whether you lost a node during the execution or not.
> 2b) That last point goes along with your side note. I'm not sure if we
> should bother dropping such state on every reassignment, though. It seems
> to be undefined territory enough that we can just do the simplest thing and
> assume people have made their own (external) provisions for durability.
> I.e., when we say "non-logged", we mean that it doesn't make use of _our_
> durability mechanism. I'm arguing that the only sane assumption is that
> such folks have opted to use their own durability measures, and we should
> just assume it works with no special considerations in the assignment
> algorithm.
>
> 3) Good catch! I've fixed it.
>
> Thanks again!
> -John
>
> On Wed, Sep 4, 2019 at 6:09 AM Bruno Cadonna <bruno@confluent.io> wrote:
>
> > Hi,
> >
> > 1) What do you mean with "full set of offsets in the topic"? Is this
> > the sum of all offsets of the changelog partitions of the task?
> >
> > 2) I am not sure whether non-logged stateful tasks should be
> > effectively treated as stateless tasks during assignment. First we
> > need to decide whether a non-logged stateful task should preferably be
> > assigned to the same instance on which it just run in order to
> > continue to use its state or not.
> >
> > 3) In the example, you define stand-by tasks {S1, S2, ...} but never
> > use them, because below you use a dedicated row for stand-by tasks.
> >
> > As a side note to 2) since it is not directly related to this KIP: We
> > should decide if we want to avoid the possible non-determinism
> > introduced by non-logged stores or not. That is, if an instance hosts
> > a task with non-logged stores then we can have two cases after the
> > next rebalance: a) the task stays on the same instance and continues
> > to use the same state store as used so far or b) the task is assigned
> > to another instance and it starts an empty state store. The produced
> > results for these two cases might differ. To avoid the nondeterminism,
> > non-logged state stores would need to be wiped out before assignment.
> > Then the question arises, how the removal of non-logged state stores
> > before assignment would affect backward-compatibility.
> >
> > Best,
> > Bruno
> >
> > On Wed, Aug 21, 2019 at 11:40 PM John Roesler <john@confluent.io> wrote:
> > >
> > > Hi Guozhang,
> > >
> > > > My impression from your previous email is that inside the algorithm
> > when
> > > we
> > > are "filling" them to instances some deterministic logic would be used to
> > > avoid the above case, is that correct?
> > >
> > > Yes, that was my plan, but I didn't formalize it. There was a requirement
> > > that the assignment algorithm must not produce a new assignment if the
> > > current assignment is already balanced, so at the least, any thrashing
> > > would be restricted to the "balancing" phase while tasks are moving
> > around
> > > the cluster.
> > >
> > > Anyway, I think it would be good to say that we'll "try to" produce
> > stable
> > > assignments, so I've added a "should" clause to the assignment spec:
> > >
> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-441%3A+Smooth+Scaling+Out+for+Kafka+Streams#KIP-441:SmoothScalingOutforKafkaStreams-AssignmentAlgorithm
> > >
> > > For example, we would sort the stateless tasks and available instances
> > > before assigning them, so that the stateless task assignment would mostly
> > > stay stable between assignments, modulo the compute capacity of the
> > > instances changing a little as active stateful tasks get assigned in more
> > > balanced ways.
> > >
> > > Thanks,
> > > -John
> > >
> > >
> > > On Wed, Aug 21, 2019 at 1:55 PM Guozhang Wang <wangguoz@gmail.com>
> > wrote:
> > >
> > > > Hello John,
> > > >
> > > > That sounds reasonable. Just double checked the code that with logging
> > > > disabled the corresponding checkpoint file would not contain any
> > values,
> > > > just like a stateless task. So I think treating them logically the
> > same is
> > > > fine.
> > > >
> > > > Guozhang
> > > >
> > > >
> > > > On Wed, Aug 21, 2019 at 11:41 AM John Roesler <john@confluent.io>
> > wrote:
> > > >
> > > > > Hi again, Guozhang,
> > > > >
> > > > > While writing up the section on stateless tasks (
> > > > >
> > > > >
> > > >
> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-441%3A+Smooth+Scaling+Out+for+Kafka+Streams#KIP-441:SmoothScalingOutforKafkaStreams-Statelesstasks
> > > > > ),
> > > > > I reconsidered whether stateful, but non-logged, tasks should
> > actually
> > > > > report a lag of zero, versus not reporting any lag. By the
> > definition of
> > > > > the "StatefulTasksToRankedCandidates" function, the leader would
> > compute
> > > > a
> > > > > lag of zero for these tasks anyway.
> > > > >
> > > > > Therefore, I think the same reasoning that I supplied you for
> > stateless
> > > > > tasks applies, since the member and leader will agree on a lag of
> > zero
> > > > > anyway, we can avoid adding them to the "Task Lags" map, and save
> > some
> > > > > bytes in the JoinGroup request. This would be especially beneficial
> > in an
> > > > > application that uses remote stores for _all_ its state stores, it
> > would
> > > > > have an extremely lightweight JoinGroup request, with no task lags at
> > > > all.
> > > > >
> > > > > WDYT?
> > > > > -John
> > > > >
> > > > > On Wed, Aug 21, 2019 at 1:17 PM John Roesler <john@confluent.io>
> > wrote:
> > > > >
> > > > > > Thanks, Guozhang.
> > > > > >
> > > > > > (Side note: I noticed on another pass over the discussion that I'd
> > > > missed
> > > > > > addressing your comment about the potential race condition between
> > > > state
> > > > > > cleanup and lag-based assignment. I've added a solution to the
> > > > proposal:
> > > > > >
> > > > >
> > > >
> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-441%3A+Smooth+Scaling+Out+for+Kafka+Streams#KIP-441:SmoothScalingOutforKafkaStreams-Racebetweenassignmentandstatecleanup
> > > > > > )
> > > > > >
> > > > > > In the JoinGroup (SubscriptionInfo) metadata, stateless tasks are
> > not
> > > > > > represented at all. This should save us some bytes in the request
> > > > > metadata.
> > > > > > If we treated them like non-logged stateful tasks and reported a
> > lag of
> > > > > 0,
> > > > > > the only difference is that the assignor would be able to tell
> > which
> > > > > > members previously hosted that stateless task.
> > > > > >
> > > > > > I'd like to make a simplifying assumption that stateless tasks can
> > just
> > > > > be
> > > > > > freely reassigned with no regard to stickiness at all, without
> > > > impacting
> > > > > > performance. This is almost true. In fact, while assigned a
> > stateless
> > > > > task,
> > > > > > a member fetches batches of records from the broker, so if we move
> > the
> > > > > > stateless task assignment, this buffered input is wasted and just
> > gets
> > > > > > dropped.
> > > > > >
> > > > > > However, we won't be moving the stateless tasks around all the time
> > > > (just
> > > > > > during rebalances), and we have the requirement that the assigment
> > > > > > algorithm must stabilize to guard against perpetually shuffling a
> > > > > stateless
> > > > > > task from one node to another. So, my hope is that this small
> > amount of
> > > > > > inefficiency would not be a performance-dominating factor. In
> > exchange,
> > > > > we
> > > > > > gain the opportunity for the assignment algorithm to use the
> > stateless
> > > > > > tasks as "filler" during unbalanced assignments. For example, if
> > there
> > > > > is a
> > > > > > node that is just warming up with several standby tasks, maybe the
> > > > > > assignment can give more stateless tasks to that node to balance
> > the
> > > > > > computational load across the cluster.
> > > > > >
> > > > > > It's worth noting that such an assignment would still not be
> > considered
> > > > > > "balanced", so the ultimately balanced final state of the
> > assignment
> > > > > (after
> > > > > > task movements) would still have the desired property that each
> > > > stateful
> > > > > > and stateless task is evenly spread across the cluster.
> > > > > >
> > > > > > Does that seem reasonable?
> > > > > >
> > > > > > Thanks,
> > > > > > -John
> > > > > >
> > > > > > On Wed, Aug 21, 2019 at 11:22 AM Guozhang Wang <wangguoz@gmail.com
> > >
> > > > > wrote:
> > > > > >
> > > > > >> Hello John,
> > > > > >>
> > > > > >> I've made another pass on the wiki page again, overall LGTM. One
> > meta
> > > > > >> comment about the "stateless" tasks: how do we represent them in
> > the
> > > > > >> metadata? Are they just treated as stateful tasks with logging
> > > > disabled,
> > > > > >> or
> > > > > >> are specially handled? It is not very clear in the description.
> > > > > >>
> > > > > >>
> > > > > >> Guozhang
> > > > > >>
> > > > > >> On Wed, Aug 21, 2019 at 8:43 AM John Roesler <john@confluent.io>
> > > > wrote:
> > > > > >>
> > > > > >> > I have also specifically called out that the assignment must
> > achieve
> > > > > >> both
> > > > > >> > "instance" and "task" balance:
> > > > > >> >
> > > > > >> >
> > > > > >>
> > > > >
> > > >
> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-441%3A+Smooth+Scaling+Out+for+Kafka+Streams#KIP-441:SmoothScalingOutforKafkaStreams-Defining%22balance%22
> > > > > >> >
> > > > > >> > I've also addressed the problem of state stores with logging
> > > > disabled:
> > > > > >> >
> > > > > >> >
> > > > > >>
> > > > >
> > > >
> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-441%3A+Smooth+Scaling+Out+for+Kafka+Streams#KIP-441:SmoothScalingOutforKafkaStreams-Statewithloggingdisabled
> > > > > >> >
> > > > > >> > I believe this addresses all the concerns that have been raised
> > to
> > > > > date.
> > > > > >> > Apologies if I've overlooked one of your concerns.
> > > > > >> >
> > > > > >> > Please give the KIP another read and let me know of any further
> > > > > >> thoughts!
> > > > > >> > Hopefully, we can start the voting on this KIP by the end of the
> > > > week.
> > > > > >> >
> > > > > >> > Thanks,
> > > > > >> > -John
> > > > > >> >
> > > > > >> > On Tue, Aug 20, 2019 at 5:16 PM John Roesler <john@confluent.io
> > >
> > > > > wrote:
> > > > > >> >
> > > > > >> > > In response to Bruno's concern #2, I've also added that
> > section to
> > > > > the
> > > > > >> > > "Rejected Alternatives" section.
> > > > > >> > >
> > > > > >> > > Additionally, after reviewing some other assignment papers,
> > I've
> > > > > >> > developed
> > > > > >> > > the concern that specifying which "phases" the assignment
> > > > algorithm
> > > > > >> > should
> > > > > >> > > have, or indeed the logic of it at all, might be a mistake
> > that
> > > > > >> > > over-constrains our ability to write an optimal algorithm.
> > > > > Therefore,
> > > > > >> > I've
> > > > > >> > > also refactored the KIP to just describe the protocol, and
> > specify
> > > > > the
> > > > > >> > > requirements for the assignment algorithm, but not its exact
> > > > > behavior
> > > > > >> at
> > > > > >> > > all.
> > > > > >> > >
> > > > > >> > > Thanks,
> > > > > >> > > -John
> > > > > >> > >
> > > > > >> > > On Tue, Aug 20, 2019 at 5:13 PM John Roesler <
> > john@confluent.io>
> > > > > >> wrote:
> > > > > >> > >
> > > > > >> > >> Hi All,
> > > > > >> > >>
> > > > > >> > >> Thanks for the discussion. I've been considering the idea of
> > > > giving
> > > > > >> the
> > > > > >> > >> "catching up" tasks a different name/role. I was in favor
> > > > > initially,
> > > > > >> but
> > > > > >> > >> after working though some details, I think it causes some
> > > > problems,
> > > > > >> > which
> > > > > >> > >> I've written up in the "rejected alternatives" part of the
> > KIP:
> > > > > >> > >>
> > > > > >> >
> > > > > >>
> > > > >
> > > >
> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-441%3A+Smooth+Scaling+Out+for+Kafka+Streams#KIP-441:SmoothScalingOutforKafkaStreams-Addinganewkindoftask(%22moving%22,%22recovering%22,%22learner%22)fortaskmovements
> > > > > >> > >>
> > > > > >> > >> Please give it a read and let me know what you think.
> > > > > >> > >>
> > > > > >> > >> Thanks,
> > > > > >> > >> -John
> > > > > >> > >>
> > > > > >> > >> On Thu, Aug 8, 2019 at 7:57 PM Guozhang Wang <
> > wangguoz@gmail.com
> > > > >
> > > > > >> > wrote:
> > > > > >> > >>
> > > > > >> > >>> I think I agree with you Sophie. My gut feeling is that 1)
> > it
> > > > > should
> > > > > >> > not
> > > > > >> > >>> be
> > > > > >> > >>> the major concern in assignor's algorithm for standby tasks
> > not
> > > > > >> > catching
> > > > > >> > >>> up, but rather be tackled in different modules, and 2) a
> > lot of
> > > > > >> > >>> optimization can be down at the stream thread itself, like
> > > > > dedicated
> > > > > >> > >>> threading and larger batching, or even complicated
> > scheduling
> > > > > >> > mechanisms
> > > > > >> > >>> between running, restoring and standby tasks. In anyways, I
> > > > think
> > > > > we
> > > > > >> > can
> > > > > >> > >>> take this out of the scope of KIP-441 for now.
> > > > > >> > >>>
> > > > > >> > >>>
> > > > > >> > >>> Guozhang
> > > > > >> > >>>
> > > > > >> > >>>
> > > > > >> > >>> On Thu, Aug 8, 2019 at 5:48 PM Sophie Blee-Goldman <
> > > > > >> > sophie@confluent.io>
> > > > > >> > >>> wrote:
> > > > > >> > >>>
> > > > > >> > >>> > > we may have other ways to not starving the standby
> > tasks,
> > > > for
> > > > > >> > >>> example, by
> > > > > >> > >>> > > using dedicate threads for standby tasks or even
> > consider
> > > > > having
> > > > > >> > >>> > *higher> priority for standby than active* so that we
> > always
> > > > try
> > > > > >> to
> > > > > >> > >>> caught
> > > > > >> > >>> > up standby
> > > > > >> > >>> > > first, then process active
> > > > > >> > >>> >
> > > > > >> > >>> > This is an interesting idea, but seems likely to get in
> > the
> > > > way
> > > > > of
> > > > > >> > the
> > > > > >> > >>> > original idea of this KIP
> > > > > >> > >>> > -- if we always process standby tasks first, then if we
> > are
> > > > > >> assigned
> > > > > >> > a
> > > > > >> > >>> new
> > > > > >> > >>> > standby task we
> > > > > >> > >>> > will have to wait for it to catch up completely before
> > > > > processing
> > > > > >> any
> > > > > >> > >>> > active tasks! That's
> > > > > >> > >>> > even worse than the situation this KIP is trying to help
> > with,
> > > > > >> since
> > > > > >> > a
> > > > > >> > >>> new
> > > > > >> > >>> > standby task has to
> > > > > >> > >>> > restore from 0 (whereas an active task at least can take
> > over
> > > > > from
> > > > > >> > >>> wherever
> > > > > >> > >>> > the standby was).
> > > > > >> > >>> >
> > > > > >> > >>> > During restoration -- while there exist any restoring
> > tasks
> > > > -- I
> > > > > >> > think
> > > > > >> > >>> it's
> > > > > >> > >>> > reasonable to de-prioritize the
> > > > > >> > >>> > standby tasks and just process restoring and active tasks
> > so
> > > > > both
> > > > > >> can
> > > > > >> > >>> make
> > > > > >> > >>> > progress. But we should
> > > > > >> > >>> > let them catch up afterwards somehow -- maybe we can apply
> > > > some
> > > > > >> kind
> > > > > >> > of
> > > > > >> > >>> > heuristic, like "if we haven't
> > > > > >> > >>> > processed standbys for X iterations, or Y milliseconds,
> > do so
> > > > > >> now."
> > > > > >> > >>> >
> > > > > >> > >>> > Actually, it might even be beneficial to avoid processing
> > > > > >> standbys a
> > > > > >> > >>> record
> > > > > >> > >>> > or two at a time and instead
> > > > > >> > >>> > wait for a large enough batch to build up for the RocksDB
> > > > > >> > bulk-loading
> > > > > >> > >>> > benefits.
> > > > > >> > >>> >
> > > > > >> > >>> > I think the "use dedicated threads for standby" is the
> > more
> > > > > >> promising
> > > > > >> > >>> end
> > > > > >> > >>> > goal, especially since
> > > > > >> > >>> > if we split restoration into "restoring tasks" then
> > active and
> > > > > >> > standbys
> > > > > >> > >>> > share almost nothing. But
> > > > > >> > >>> > that seems like follow-up work to the current KIP :)
> > > > > >> > >>> >
> > > > > >> > >>> > On Thu, Aug 8, 2019 at 5:31 PM Sophie Blee-Goldman <
> > > > > >> > >>> sophie@confluent.io>
> > > > > >> > >>> > wrote:
> > > > > >> > >>> >
> > > > > >> > >>> > > Stateful tasks with logging disabled seem to be an
> > > > interesting
> > > > > >> edge
> > > > > >> > >>> case.
> > > > > >> > >>> > > On the one hand,
> > > > > >> > >>> > > for balancing purposes they should be considered
> > stateful
> > > > > since
> > > > > >> as
> > > > > >> > >>> > > Guozhang pointed out
> > > > > >> > >>> > > they are still "heavy" in IO costs. But for "catching
> > up"
> > > > > >> purposes,
> > > > > >> > >>> ie
> > > > > >> > >>> > > when allocating standby
> > > > > >> > >>> > > tasks that will become active tasks, they should be
> > > > considered
> > > > > >> > >>> stateless
> > > > > >> > >>> > > as there is so
> > > > > >> > >>> > > meaningful sense of their lag. We should never allocate
> > > > > standby
> > > > > >> > >>> tasks for
> > > > > >> > >>> > > them during the
> > > > > >> > >>> > > first rebalance, but should ensure they are evenly
> > > > distributed
> > > > > >> > across
> > > > > >> > >>> > > instances. Maybe we
> > > > > >> > >>> > > should split these into a third category -- after we
> > assign
> > > > > all
> > > > > >> > >>> stateful
> > > > > >> > >>> > > tasks with logging, we
> > > > > >> > >>> > > then distribute the set of logging-disabled stateful
> > tasks
> > > > to
> > > > > >> > improve
> > > > > >> > >>> > > balance, before lastly
> > > > > >> > >>> > > distributing stateless tasks?
> > > > > >> > >>> > >
> > > > > >> > >>> > > This actually leads into what I was just thinking,
> > which is
> > > > > >> that we
> > > > > >> > >>> > really
> > > > > >> > >>> > > should distinguish the
> > > > > >> > >>> > > "catch-up" standbys from normal standbys as well as
> > > > > >> distinguishing
> > > > > >> > >>> > > actively processing tasks
> > > > > >> > >>> > > from active tasks that are still in the restore phase.
> > It's
> > > > > >> > somewhat
> > > > > >> > >>> > > awkward that today, some
> > > > > >> > >>> > > active tasks just start processing immediately while
> > others
> > > > > >> behave
> > > > > >> > >>> more
> > > > > >> > >>> > > like standby than active
> > > > > >> > >>> > > tasks for some time, before switching to real active.
> > They
> > > > > first
> > > > > >> > use
> > > > > >> > >>> the
> > > > > >> > >>> > > restoreConsumer, then
> > > > > >> > >>> > > later only the "normal" consumer.
> > > > > >> > >>> > >
> > > > > >> > >>> > > However, this restore period is still distinct from
> > normal
> > > > > >> standbys
> > > > > >> > >>> in a
> > > > > >> > >>> > > lot of ways -- the code path
> > > > > >> > >>> > > for restoring is different than for updating standbys,
> > for
> > > > > >> example
> > > > > >> > >>> in how
> > > > > >> > >>> > > long we block on #poll.
> > > > > >> > >>> > > So in addition to giving them their own name -- let's go
> > > > with
> > > > > >> > >>> restoring
> > > > > >> > >>> > > task for now -- they really
> > > > > >> > >>> > > do seem to deserve being their own distinct task. We can
> > > > > >> optimize
> > > > > >> > >>> them
> > > > > >> > >>> > for
> > > > > >> > >>> > > efficient conversion
> > > > > >> > >>> > > to active tasks since we know that's what they will be.
> > > > > >> > >>> > >
> > > > > >> > >>> > > This resolves some of the awkwardness of dealing with
> > the
> > > > > >> special
> > > > > >> > >>> case
> > > > > >> > >>> > > mentioned above: we
> > > > > >> > >>> > > find a balanced assignment of stateful and stateless
> > tasks,
> > > > > and
> > > > > >> > >>> create
> > > > > >> > >>> > > restoring tasks as needed.
> > > > > >> > >>> > > If logging is disabled, no restoring task is created.
> > > > > >> > >>> > >
> > > > > >> > >>> > >
> > > > > >> > >>> > > On Thu, Aug 8, 2019 at 3:44 PM Guozhang Wang <
> > > > > >> wangguoz@gmail.com>
> > > > > >> > >>> wrote:
> > > > > >> > >>> > >
> > > > > >> > >>> > >> Regarding 3) above: I think for active task they should
> > > > still
> > > > > >> be
> > > > > >> > >>> > >> considered
> > > > > >> > >>> > >> stateful since the processor would still pay IO cost
> > > > > accessing
> > > > > >> the
> > > > > >> > >>> > store,
> > > > > >> > >>> > >> but they would not have standby tasks?
> > > > > >> > >>> > >>
> > > > > >> > >>> > >> On Thu, Aug 8, 2019 at 7:49 AM Bruno Cadonna <
> > > > > >> bruno@confluent.io>
> > > > > >> > >>> > wrote:
> > > > > >> > >>> > >>
> > > > > >> > >>> > >> > Hi,
> > > > > >> > >>> > >> >
> > > > > >> > >>> > >> > Thank you for the KIP!
> > > > > >> > >>> > >> >
> > > > > >> > >>> > >> > Some questions/comments:
> > > > > >> > >>> > >> >
> > > > > >> > >>> > >> > 1. I am wondering if the "stand-by" tasks that catch
> > up
> > > > > state
> > > > > >> > >>> before
> > > > > >> > >>> > >> > the active task is switched deserve its own name in
> > this
> > > > > KIP
> > > > > >> and
> > > > > >> > >>> maybe
> > > > > >> > >>> > >> > in the code. We have already stated that they are not
> > > > true
> > > > > >> > >>> stand-by
> > > > > >> > >>> > >> > tasks, they are not configured through
> > > > > >> `num.standby.replicas`,
> > > > > >> > and
> > > > > >> > >>> > >> > maybe they have also other properties that
> > distinguish
> > > > them
> > > > > >> from
> > > > > >> > >>> true
> > > > > >> > >>> > >> > stand-by tasks of which we are not aware yet. For
> > > > example,
> > > > > >> they
> > > > > >> > >>> may be
> > > > > >> > >>> > >> > prioritized differently than other tasks.
> > Furthermore,
> > > > the
> > > > > >> name
> > > > > >> > >>> > >> > "stand-by" does not really fit with the planned
> > > > > >> functionality of
> > > > > >> > >>> those
> > > > > >> > >>> > >> > tasks. In the following, I will call them false
> > stand-by
> > > > > >> tasks.
> > > > > >> > >>> > >> >
> > > > > >> > >>> > >> > 2. Did you consider to trigger the probing
> > rebalances not
> > > > > at
> > > > > >> > >>> regular
> > > > > >> > >>> > >> > time intervals but when the false stand-by tasks
> > reach an
> > > > > >> > >>> acceptable
> > > > > >> > >>> > >> > lag? If you did consider, could you add a paragraph
> > why
> > > > you
> > > > > >> > >>> rejected
> > > > > >> > >>> > >> > this idea to the "Rejected Alternatives" section.
> > > > > >> > >>> > >> >
> > > > > >> > >>> > >> > 3. Are tasks that solely contain stores with disabled
> > > > > logging
> > > > > >> > >>> > >> > classified as stateful or stateless in the
> > algorithm? I
> > > > > would
> > > > > >> > >>> guess
> > > > > >> > >>> > >> > stateless, although if possible they should be
> > assigned
> > > > to
> > > > > >> the
> > > > > >> > >>> same
> > > > > >> > >>> > >> > instance they had run before the rebalance. As far
> > as I
> > > > can
> > > > > >> see
> > > > > >> > >>> this
> > > > > >> > >>> > >> > special case is not handled in the algorithm.
> > > > > >> > >>> > >> >
> > > > > >> > >>> > >> > Best,
> > > > > >> > >>> > >> > Bruno
> > > > > >> > >>> > >> >
> > > > > >> > >>> > >> >
> > > > > >> > >>> > >> >
> > > > > >> > >>> > >> > On Thu, Aug 8, 2019 at 8:24 AM Guozhang Wang <
> > > > > >> > wangguoz@gmail.com>
> > > > > >> > >>> > >> wrote:
> > > > > >> > >>> > >> > >
> > > > > >> > >>> > >> > > 1. Sounds good, just wanted to clarify; and it may
> > > > worth
> > > > > >> > >>> documenting
> > > > > >> > >>> > >> it
> > > > > >> > >>> > >> > so
> > > > > >> > >>> > >> > > that users would not be surprised when monitoring
> > their
> > > > > >> > >>> footprint.
> > > > > >> > >>> > >> > >
> > > > > >> > >>> > >> > > 2. Hmm I see... I think the trade-off can be
> > described
> > > > as
> > > > > >> "how
> > > > > >> > >>> much
> > > > > >> > >>> > >> > > imbalance would bother you to be willing to pay
> > another
> > > > > >> > >>> rebalance,
> > > > > >> > >>> > >> along
> > > > > >> > >>> > >> > > with potentially more restoration lag", and the
> > current
> > > > > >> > >>> definition
> > > > > >> > >>> > of
> > > > > >> > >>> > >> > > rebalance_factor can be considered as a rough
> > > > measurement
> > > > > >> of
> > > > > >> > >>> that
> > > > > >> > >>> > >> > > imbalance. Of course one can argue that a finer
> > grained
> > > > > >> > >>> measurement
> > > > > >> > >>> > >> could
> > > > > >> > >>> > >> > > be "resource footprint" like CPU / storage of each
> > > > > instance
> > > > > >> > >>> like we
> > > > > >> > >>> > >> have
> > > > > >> > >>> > >> > in
> > > > > >> > >>> > >> > > Kafka broker auto balancing tools, but I'd prefer
> > not
> > > > > doing
> > > > > >> > >>> that as
> > > > > >> > >>> > >> part
> > > > > >> > >>> > >> > of
> > > > > >> > >>> > >> > > the library but more as an operational tool in the
> > > > > future.
> > > > > >> On
> > > > > >> > >>> the
> > > > > >> > >>> > >> other
> > > > > >> > >>> > >> > > hand, I've seen stateful and stateless tasks having
> > > > very
> > > > > >> > >>> different
> > > > > >> > >>> > >> load,
> > > > > >> > >>> > >> > > and sometimes the only bottleneck of a Streams app
> > is
> > > > > just
> > > > > >> one
> > > > > >> > >>> > >> stateful
> > > > > >> > >>> > >> > > sub-topology and whoever gets tasks of that
> > > > sub-topology
> > > > > >> > become
> > > > > >> > >>> > >> hotspot
> > > > > >> > >>> > >> > > (and that's why our algorithm tries to balance per
> > > > > >> > sub-topology
> > > > > >> > >>> as
> > > > > >> > >>> > >> well),
> > > > > >> > >>> > >> > > so maybe we can just consider stateful tasks when
> > > > > >> calculating
> > > > > >> > >>> this
> > > > > >> > >>> > >> factor
> > > > > >> > >>> > >> > > as a very brute force heuristic?
> > > > > >> > >>> > >> > >
> > > > > >> > >>> > >> > > 3.a. Thinking about this a bit more, maybe it's
> > better
> > > > > not
> > > > > >> try
> > > > > >> > >>> to
> > > > > >> > >>> > >> tackle
> > > > > >> > >>> > >> > an
> > > > > >> > >>> > >> > > unseen enemy just yet, and observe if it really
> > emerges
> > > > > >> later,
> > > > > >> > >>> and
> > > > > >> > >>> > by
> > > > > >> > >>> > >> > then
> > > > > >> > >>> > >> > > we may have other ways to not starving the standby
> > > > tasks,
> > > > > >> for
> > > > > >> > >>> > >> example, by
> > > > > >> > >>> > >> > > using dedicate threads for standby tasks or even
> > > > consider
> > > > > >> > having
> > > > > >> > >>> > >> higher
> > > > > >> > >>> > >> > > priority for standby than active so that we always
> > try
> > > > to
> > > > > >> > >>> caught up
> > > > > >> > >>> > >> > standby
> > > > > >> > >>> > >> > > first, then process active; and if active's lagging
> > > > > >> compared
> > > > > >> > to
> > > > > >> > >>> > >> > > log-end-offset is increasing then we should
> > increase
> > > > > >> capacity,
> > > > > >> > >>> etc
> > > > > >> > >>> > >> etc.
> > > > > >> > >>> > >> > >
> > > > > >> > >>> > >> > > 4. Actually with KIP-429 this may not be the case:
> > we
> > > > may
> > > > > >> not
> > > > > >> > >>> call
> > > > > >> > >>> > >> > > onPartitionsRevoked prior to rebalance any more so
> > > > would
> > > > > >> not
> > > > > >> > >>> transit
> > > > > >> > >>> > >> > state
> > > > > >> > >>> > >> > > to PARTITIONS_REVOKED, and hence not cause the
> > state of
> > > > > the
> > > > > >> > >>> instance
> > > > > >> > >>> > >> to
> > > > > >> > >>> > >> > be
> > > > > >> > >>> > >> > > REBALANCING. In other words, even if a instance is
> > > > > >> undergoing
> > > > > >> > a
> > > > > >> > >>> > >> rebalance
> > > > > >> > >>> > >> > > it's state may still be RUNNING and it may still be
> > > > > >> processing
> > > > > >> > >>> > >> records at
> > > > > >> > >>> > >> > > the same time.
> > > > > >> > >>> > >> > >
> > > > > >> > >>> > >> > >
> > > > > >> > >>> > >> > > On Wed, Aug 7, 2019 at 12:14 PM John Roesler <
> > > > > >> > john@confluent.io
> > > > > >> > >>> >
> > > > > >> > >>> > >> wrote:
> > > > > >> > >>> > >> > >
> > > > > >> > >>> > >> > > > Hey Guozhang,
> > > > > >> > >>> > >> > > >
> > > > > >> > >>> > >> > > > Thanks for the review!
> > > > > >> > >>> > >> > > >
> > > > > >> > >>> > >> > > > 1. Yes, even with `num.standby.replicas := 0`, we
> > > > will
> > > > > >> still
> > > > > >> > >>> > >> > temporarily
> > > > > >> > >>> > >> > > > allocate standby tasks to accomplish a
> > no-downtime
> > > > task
> > > > > >> > >>> migration.
> > > > > >> > >>> > >> > > > Although, I'd argue that this doesn't really
> > violate
> > > > > the
> > > > > >> > >>> config,
> > > > > >> > >>> > as
> > > > > >> > >>> > >> the
> > > > > >> > >>> > >> > > > task isn't a true hot standby. As soon as it
> > catches
> > > > > up,
> > > > > >> > we'll
> > > > > >> > >>> > >> > rebalance
> > > > > >> > >>> > >> > > > again, that task will become active, and the
> > original
> > > > > >> > instance
> > > > > >> > >>> > that
> > > > > >> > >>> > >> > hosted
> > > > > >> > >>> > >> > > > the active task will no longer have the task
> > assigned
> > > > > at
> > > > > >> > all.
> > > > > >> > >>> Once
> > > > > >> > >>> > >> the
> > > > > >> > >>> > >> > > > stateDirCleaner kicks in, we'll free the disk
> > space
> > > > > from
> > > > > >> it,
> > > > > >> > >>> and
> > > > > >> > >>> > >> > return to
> > > > > >> > >>> > >> > > > the steady-state of having just one copy of the
> > task
> > > > in
> > > > > >> the
> > > > > >> > >>> > cluster.
> > > > > >> > >>> > >> > > >
> > > > > >> > >>> > >> > > > We can of course do without this, but I feel the
> > > > > current
> > > > > >> > >>> proposal
> > > > > >> > >>> > is
> > > > > >> > >>> > >> > > > operationally preferable, since it doesn't make
> > > > > >> configuring
> > > > > >> > >>> > >> > hot-standbys a
> > > > > >> > >>> > >> > > > pre-requisite for fast rebalances.
> > > > > >> > >>> > >> > > >
> > > > > >> > >>> > >> > > > 2. Yes, I think your interpretation is what we
> > > > > intended.
> > > > > >> The
> > > > > >> > >>> > default
> > > > > >> > >>> > >> > > > balance_factor would be 1, as it is implicitly
> > today.
> > > > > >> What
> > > > > >> > >>> this
> > > > > >> > >>> > >> does is
> > > > > >> > >>> > >> > > > allows operators to trade off less balanced
> > > > assignments
> > > > > >> > >>> against
> > > > > >> > >>> > >> fewer
> > > > > >> > >>> > >> > > > rebalances. If you have lots of space capacity in
> > > > your
> > > > > >> > >>> instances,
> > > > > >> > >>> > >> this
> > > > > >> > >>> > >> > may
> > > > > >> > >>> > >> > > > be a perfectly fine tradeoff, and you may prefer
> > for
> > > > > >> Streams
> > > > > >> > >>> not
> > > > > >> > >>> > to
> > > > > >> > >>> > >> > bother
> > > > > >> > >>> > >> > > > streaming GBs of data from the broker in pursuit
> > of
> > > > > >> perfect
> > > > > >> > >>> > balance.
> > > > > >> > >>> > >> > Not
> > > > > >> > >>> > >> > > > married to this configuration, though. It was
> > > > inspired
> > > > > by
> > > > > >> > the
> > > > > >> > >>> > >> related
> > > > > >> > >>> > >> > work
> > > > > >> > >>> > >> > > > research we did.
> > > > > >> > >>> > >> > > >
> > > > > >> > >>> > >> > > > 3. I'll take a look
> > > > > >> > >>> > >> > > >
> > > > > >> > >>> > >> > > > 3a. I think this is a good idea. I'd classify it
> > as a
> > > > > >> type
> > > > > >> > of
> > > > > >> > >>> grey
> > > > > >> > >>> > >> > failure
> > > > > >> > >>> > >> > > > detection. It may make more sense to tackle grey
> > > > > >> failures as
> > > > > >> > >>> part
> > > > > >> > >>> > of
> > > > > >> > >>> > >> > the
> > > > > >> > >>> > >> > > > heartbeat protocol (as I POCed here:
> > > > > >> > >>> > >> > > > https://github.com/apache/kafka/pull/7096/files
> > ).
> > > > > WDYT?
> > > > > >> > >>> > >> > > >
> > > > > >> > >>> > >> > > > 4. Good catch! I didn't think about that before.
> > > > > Looking
> > > > > >> at
> > > > > >> > it
> > > > > >> > >>> > now,
> > > > > >> > >>> > >> > though,
> > > > > >> > >>> > >> > > > I wonder if we're actually protected already. The
> > > > > >> > >>> stateDirCleaner
> > > > > >> > >>> > >> > thread
> > > > > >> > >>> > >> > > > only executes if the instance is in RUNNING
> > state,
> > > > and
> > > > > >> > KIP-441
> > > > > >> > >>> > >> > proposes to
> > > > > >> > >>> > >> > > > use "probing rebalances" to report task lag.
> > Hence,
> > > > > >> during
> > > > > >> > the
> > > > > >> > >>> > >> window
> > > > > >> > >>> > >> > > > between when the instance reports a lag and the
> > > > > assignor
> > > > > >> > >>> makes a
> > > > > >> > >>> > >> > decision
> > > > > >> > >>> > >> > > > about it, the instance should remain in
> > REBALANCING
> > > > > >> state,
> > > > > >> > >>> right?
> > > > > >> > >>> > If
> > > > > >> > >>> > >> > so,
> > > > > >> > >>> > >> > > > then this should prevent the race condition. If
> > not,
> > > > > >> then we
> > > > > >> > >>> do
> > > > > >> > >>> > >> indeed
> > > > > >> > >>> > >> > need
> > > > > >> > >>> > >> > > > to do something about it.
> > > > > >> > >>> > >> > > >
> > > > > >> > >>> > >> > > > 5. Good idea. I think that today, you can only
> > see
> > > > the
> > > > > >> > >>> consumer
> > > > > >> > >>> > lag,
> > > > > >> > >>> > >> > which
> > > > > >> > >>> > >> > > > is a poor substitute. I'll add some metrics to
> > the
> > > > > >> proposal.
> > > > > >> > >>> > >> > > >
> > > > > >> > >>> > >> > > > Thanks again for the comments!
> > > > > >> > >>> > >> > > > -John
> > > > > >> > >>> > >> > > >
> > > > > >> > >>> > >> > > > On Tue, Aug 6, 2019 at 4:27 PM Guozhang Wang <
> > > > > >> > >>> wangguoz@gmail.com>
> > > > > >> > >>> > >> > wrote:
> > > > > >> > >>> > >> > > >
> > > > > >> > >>> > >> > > > > Hello Sophie,
> > > > > >> > >>> > >> > > > >
> > > > > >> > >>> > >> > > > > Thanks for the proposed KIP. I left some
> > comments
> > > > on
> > > > > >> the
> > > > > >> > >>> wiki
> > > > > >> > >>> > >> itself,
> > > > > >> > >>> > >> > > > and I
> > > > > >> > >>> > >> > > > > think I'm still not very clear on a couple or
> > > > those:
> > > > > >> > >>> > >> > > > >
> > > > > >> > >>> > >> > > > > 1. With this proposal, does that mean with
> > > > > >> > >>> num.standby.replicas
> > > > > >> > >>> > ==
> > > > > >> > >>> > >> > 0, we
> > > > > >> > >>> > >> > > > > may sometimes still have some standby tasks
> > which
> > > > may
> > > > > >> > >>> violate
> > > > > >> > >>> > the
> > > > > >> > >>> > >> > config?
> > > > > >> > >>> > >> > > > >
> > > > > >> > >>> > >> > > > > 2. I think I understand the rationale to
> > consider
> > > > > lags
> > > > > >> > that
> > > > > >> > >>> is
> > > > > >> > >>> > >> below
> > > > > >> > >>> > >> > the
> > > > > >> > >>> > >> > > > > specified threshold to be equal, rather than
> > still
> > > > > >> > >>> considering
> > > > > >> > >>> > >> 5000
> > > > > >> > >>> > >> > is
> > > > > >> > >>> > >> > > > > better than 5001 -- we do not want to
> > > > "over-optimize"
> > > > > >> and
> > > > > >> > >>> > >> potentially
> > > > > >> > >>> > >> > > > falls
> > > > > >> > >>> > >> > > > > into endless rebalances back and forth.
> > > > > >> > >>> > >> > > > >
> > > > > >> > >>> > >> > > > > But I'm not clear about the rationale of the
> > second
> > > > > >> > >>> parameter of
> > > > > >> > >>> > >> > > > >
> > > > > >> > >>>
> > constrainedBalancedAssignment(StatefulTasksToRankedCandidates,
> > > > > >> > >>> > >> > > > > balance_factor):
> > > > > >> > >>> > >> > > > >
> > > > > >> > >>> > >> > > > > Does that mean, e.g. with balance_factor of 3,
> > we'd
> > > > > >> > >>> consider two
> > > > > >> > >>> > >> > > > > assignments one resulting balance_factor 0 and
> > one
> > > > > >> > resulting
> > > > > >> > >>> > >> > > > balance_factor
> > > > > >> > >>> > >> > > > > 3 to be equally optimized assignment and
> > therefore
> > > > > may
> > > > > >> > "stop
> > > > > >> > >>> > >> early"?
> > > > > >> > >>> > >> > This
> > > > > >> > >>> > >> > > > > was not very convincing to me :P
> > > > > >> > >>> > >> > > > >
> > > > > >> > >>> > >> > > > > 3. There are a couple of minor comments about
> > the
> > > > > >> > algorithm
> > > > > >> > >>> > >> itself,
> > > > > >> > >>> > >> > left
> > > > > >> > >>> > >> > > > on
> > > > > >> > >>> > >> > > > > the wiki page since it needs to refer to the
> > exact
> > > > > line
> > > > > >> > and
> > > > > >> > >>> > better
> > > > > >> > >>> > >> > > > > displayed there.
> > > > > >> > >>> > >> > > > >
> > > > > >> > >>> > >> > > > > 3.a Another wild thought about the threshold
> > > > itself:
> > > > > >> today
> > > > > >> > >>> the
> > > > > >> > >>> > >> > assignment
> > > > > >> > >>> > >> > > > > itself is memoryless, so we would not know if
> > the
> > > > > >> reported
> > > > > >> > >>> > >> `TaskLag`
> > > > > >> > >>> > >> > > > itself
> > > > > >> > >>> > >> > > > > is increasing or decreasing even if the current
> > > > value
> > > > > >> is
> > > > > >> > >>> under
> > > > > >> > >>> > the
> > > > > >> > >>> > >> > > > > threshold. I wonder if it worthy to make it a
> > bit
> > > > > more
> > > > > >> > >>> > >> complicated to
> > > > > >> > >>> > >> > > > track
> > > > > >> > >>> > >> > > > > task lag trend at the assignor? Practically it
> > may
> > > > > not
> > > > > >> be
> > > > > >> > >>> very
> > > > > >> > >>> > >> > uncommon
> > > > > >> > >>> > >> > > > > that stand-by tasks are not keeping up due to
> > the
> > > > > fact
> > > > > >> > that
> > > > > >> > >>> > other
> > > > > >> > >>> > >> > active
> > > > > >> > >>> > >> > > > > tasks hosted on the same thread is starving the
> > > > > standby
> > > > > >> > >>> tasks.
> > > > > >> > >>> > >> > > > >
> > > > > >> > >>> > >> > > > > 4. There's a potential race condition risk when
> > > > > >> reporting
> > > > > >> > >>> > >> `TaskLags`
> > > > > >> > >>> > >> > in
> > > > > >> > >>> > >> > > > the
> > > > > >> > >>> > >> > > > > subscription: right after reporting it to the
> > > > leader,
> > > > > >> the
> > > > > >> > >>> > cleanup
> > > > > >> > >>> > >> > thread
> > > > > >> > >>> > >> > > > > kicks in and deletes the state directory. If
> > the
> > > > task
> > > > > >> was
> > > > > >> > >>> > assigned
> > > > > >> > >>> > >> > to the
> > > > > >> > >>> > >> > > > > host it would cause it to restore from
> > beginning
> > > > and
> > > > > >> > >>> effectively
> > > > > >> > >>> > >> > make the
> > > > > >> > >>> > >> > > > > seemingly optimized assignment very
> > sub-optimal.
> > > > > >> > >>> > >> > > > >
> > > > > >> > >>> > >> > > > > To be on the safer side we should consider
> > either
> > > > > prune
> > > > > >> > out
> > > > > >> > >>> > those
> > > > > >> > >>> > >> > tasks
> > > > > >> > >>> > >> > > > > that are "close to be cleaned up" in the
> > > > > subscription,
> > > > > >> or
> > > > > >> > we
> > > > > >> > >>> > >> should
> > > > > >> > >>> > >> > delay
> > > > > >> > >>> > >> > > > > the cleanup right after we've included them in
> > the
> > > > > >> > >>> subscription
> > > > > >> > >>> > in
> > > > > >> > >>> > >> > case
> > > > > >> > >>> > >> > > > > they are been selected as assigned tasks by the
> > > > > >> assignor.
> > > > > >> > >>> > >> > > > >
> > > > > >> > >>> > >> > > > > 5. This is a meta comment: I think it would be
> > > > > helpful
> > > > > >> to
> > > > > >> > >>> add
> > > > > >> > >>> > some
> > > > > >> > >>> > >> > user
> > > > > >> > >>> > >> > > > > visibility on the standby tasks lagging as
> > well,
> > > > via
> > > > > >> > >>> metrics for
> > > > > >> > >>> > >> > example.
> > > > > >> > >>> > >> > > > > Today it is hard for us to observe how far are
> > our
> > > > > >> current
> > > > > >> > >>> > standby
> > > > > >> > >>> > >> > tasks
> > > > > >> > >>> > >> > > > > compared to the active tasks and whether that
> > lag
> > > > is
> > > > > >> being
> > > > > >> > >>> > >> > increasing or
> > > > > >> > >>> > >> > > > > decreasing. As a follow-up task, for example,
> > the
> > > > > >> > rebalance
> > > > > >> > >>> > should
> > > > > >> > >>> > >> > also
> > > > > >> > >>> > >> > > > be
> > > > > >> > >>> > >> > > > > triggered if we realize that some standby
> > task's
> > > > lag
> > > > > is
> > > > > >> > >>> > increasing
> > > > > >> > >>> > >> > > > > indefinitely means that it cannot keep up
> > (which is
> > > > > >> > another
> > > > > >> > >>> > >> indicator
> > > > > >> > >>> > >> > > > > either you need to add more resources with the
> > > > > >> > num.standbys
> > > > > >> > >>> or
> > > > > >> > >>> > >> your
> > > > > >> > >>> > >> > are
> > > > > >> > >>> > >> > > > > still not balanced enough).
> > > > > >> > >>> > >> > > > >
> > > > > >> > >>> > >> > > > >
> > > > > >> > >>> > >> > > > > On Tue, Aug 6, 2019 at 1:32 PM Sophie
> > Blee-Goldman
> > > > <
> > > > > >> > >>> > >> > sophie@confluent.io>
> > > > > >> > >>> > >> > > > > wrote:
> > > > > >> > >>> > >> > > > >
> > > > > >> > >>> > >> > > > > > Hey all,
> > > > > >> > >>> > >> > > > > >
> > > > > >> > >>> > >> > > > > > I'd like to kick off discussion on KIP-441,
> > aimed
> > > > > at
> > > > > >> the
> > > > > >> > >>> long
> > > > > >> > >>> > >> > restore
> > > > > >> > >>> > >> > > > > times
> > > > > >> > >>> > >> > > > > > in Streams during which further active
> > processing
> > > > > >> and IQ
> > > > > >> > >>> are
> > > > > >> > >>> > >> > blocked.
> > > > > >> > >>> > >> > > > > > Please give it a read and let us know your
> > > > thoughts
> > > > > >> > >>> > >> > > > > >
> > > > > >> > >>> > >> > > > > >
> > > > > >> > >>> > >> > > > > >
> > > > > >> > >>> > >> > > > >
> > > > > >> > >>> > >> > > >
> > > > > >> > >>> > >> >
> > > > > >> > >>> > >>
> > > > > >> > >>> >
> > > > > >> > >>>
> > > > > >> >
> > > > > >>
> > > > >
> > > >
> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-441:+Smooth+Scaling+Out+for+Kafka+Streams
> > > > > >> > >>> > >> > > > > >
> > > > > >> > >>> > >> > > > > > Cheers,
> > > > > >> > >>> > >> > > > > > Sophie
> > > > > >> > >>> > >> > > > > >
> > > > > >> > >>> > >> > > > >
> > > > > >> > >>> > >> > > > >
> > > > > >> > >>> > >> > > > > --
> > > > > >> > >>> > >> > > > > -- Guozhang
> > > > > >> > >>> > >> > > > >
> > > > > >> > >>> > >> > > >
> > > > > >> > >>> > >> > >
> > > > > >> > >>> > >> > >
> > > > > >> > >>> > >> > > --
> > > > > >> > >>> > >> > > -- Guozhang
> > > > > >> > >>> > >> >
> > > > > >> > >>> > >>
> > > > > >> > >>> > >>
> > > > > >> > >>> > >> --
> > > > > >> > >>> > >> -- Guozhang
> > > > > >> > >>> > >>
> > > > > >> > >>> > >
> > > > > >> > >>> >
> > > > > >> > >>>
> > > > > >> > >>>
> > > > > >> > >>> --
> > > > > >> > >>> -- Guozhang
> > > > > >> > >>>
> > > > > >> > >>
> > > > > >> >
> > > > > >>
> > > > > >>
> > > > > >> --
> > > > > >> -- Guozhang
> > > > > >>
> > > > > >
> > > > >
> > > >
> > > >
> > > > --
> > > > -- Guozhang
> > > >
> >
> >


Mime
View raw message