kafka-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Guozhang Wang <wangg...@gmail.com>
Subject Re: KIP-457: Add DISCONNECTED state to Kafka Streams
Date Wed, 12 Jun 2019 20:28:58 GMT
Hi Richard,

Sorry for getting late on this, I've finally get some time to take a look
at https://github.com/apache/kafka/pull/6594 as well as the KIP itself.
Here are some thoughts:

1. The main motivation of this KIP is to be able to distinguish the case
where

a. "Streams client is in an unhealthy situation and hence cannot proceed"
(which we have an ERROR state) and
b. "Streams client is perfectly healthy, but it cannot get to the target
brokers and hence cannot proceed", and this should also be distinguishable
from
c. "both Streams and brokers are healthy, there's just no data available
for processing and hence cannot proceed").

And we want to have a way to notify the users about the second case b)
distinguished from the others .

2. Following this, when I first thought about the solution I was thinking
about adding a new state in the FSM of Kafka Streams, but after reviewing
the code and the KIP, I felt this may be an overkill to complicate the FSM.
Now I'm wondering if we can achieve the same thing with a single metric.
For example:

2.a) we know that in Streams we always rely on consumer membership to
allocate partitions to instances, which means that the heartbeat thread has
to be working if the consumer wants to ever receive some data, what we can
do is to let users monitor on this metric directly, e.g. if the
heartbeat-rate drops to zero BUT the state is still in RUNNING it means we
are in case b) above.

2.b) if we want to provide a streams-level metric out-of-the-box rather
than letting users to monitor on consumer metrics, another idea is to
leverage on existing "public Set<TopicPartition> assignment()" of
KafkaConsumer, and record the time when it returns empty, meaning that
nothing was assigned. And expose this as a boolean metric indicating
nothing was assigned and hence we are likely in case b) above --- note this
could also mean that we have fewer partitions than necessary so that some
instance does not have any assignment indeed, which is not the same as b),
but I feel consolidating these to cases with a single metric seem also fine.



Guozhang




On Wed, Apr 17, 2019 at 2:30 PM Richard Yu <yohan.richard.yu@gmail.com>
wrote:

> Alright, so I made a few changes to the KIP.
> I realized that there might be an easier way to give the user information
> on the connection state of Kafka Streams.
> In implementation, if one wishes to have DISCONNECTED as a state, then one
> would have to factor in proper state transitions.
> The other approach that is now outlined in the KIP. Instead, we could just
> add a method which I think achieves the same effect.
> If any of you thinks there is wrong with this approach, please let me know.
> :)
>
> Cheers,
> Richard
>
> On Wed, Apr 17, 2019 at 11:49 AM Richard Yu <yohan.richard.yu@gmail.com>
> wrote:
>
> > I just realized something.
> >
> > Hi Matthias, might need your input here.
> > I realized that when implementing this change, as noted in the JIRA, we
> > would need to "check the behaviour of the consumer" since its consumer's
> > connection with broker that we are dealing with.
> >
> > So doesn't that mean we would also be dealing with consumer API changes
> as
> > well?
> > I don't think consumer has any methods which would give us the state of a
> > connection either.
> >
> > - Richard
> >
> > On Wed, Apr 17, 2019 at 8:43 AM Richard Yu <yohan.richard.yu@gmail.com>
> > wrote:
> >
> >> Hi Micheal,
> >>
> >> Yeah, those are some points I should've clarified.
> >> No problem. Have got it done.
> >>
> >>
> >>
> >> On Wed, Apr 17, 2019 at 6:42 AM Michael Noll <michael@confluent.io>
> >> wrote:
> >>
> >>> Richard,
> >>>
> >>> thanks for looking into this!
> >>>
> >>> However, I have some concerns. The KIP you created (
> >>>
> >>>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-457%3A+Add+DISCONNECTED+status+to+Kafka+Streams
> >>> )
> >>> doesn't yet address open questions such as the ones mentioned by
> >>> Matthias:
> >>>
> >>> 1) What is the difference between DEAD and the proposed DISCONNECTED?
> >>> This
> >>> should be defined in the KIP.
> >>>
> >>> 2) Difference between your KIP and the JIRA (
> >>> https://issues.apache.org/jira/browse/KAFKA-6520): In the JIRA ticket,
> >>> the
> >>> DISCONNECTED state was proposed for the scenario that the KStreams
> >>> application is healthy but the Kafka broker is down. This is different
> to
> >>> what you wrote in the KIP: "When something happens in Kafka Streams,
> such
> >>> as an unexpected crash or error, KafkaStreams#state() will return
> >>> State.DISCONNECTED.", which seems to mean that DISCONNECTED should be
> the
> >>> state when the KStreams app is down.
> >>>
> >>> I wouldn't expect a KIP vote to pass if these basic questions aren't
> >>> properly sorted out in the KIP.
> >>>
> >>> Best,
> >>> Michael
> >>>
> >>>
> >>>
> >>> On Wed, Apr 17, 2019 at 3:35 AM Richard Yu <yohan.richard.yu@gmail.com
> >
> >>> wrote:
> >>>
> >>> > Hi all,
> >>> >
> >>> > Considering that this is a simple KIP, I would probably start the
> >>> voting
> >>> > tomorrow.
> >>> > I think it would be good if we could get this in fast.
> >>> >
> >>> > On Tue, Apr 16, 2019 at 3:31 PM Richard Yu <
> yohan.richard.yu@gmail.com
> >>> >
> >>> > wrote:
> >>> >
> >>> > > Oh, I probably misunderstood the difference between DISCONNECTED
> and
> >>> > DEAD.
> >>> > > I will update the KIP accordingly.
> >>> > > Thanks for pointing that out!
> >>> > >
> >>> > >
> >>> > > On Tue, Apr 16, 2019 at 3:13 PM Matthias J. Sax <
> >>> matthias@confluent.io>
> >>> > > wrote:
> >>> > >
> >>> > >> Thanks for the initiative.
> >>> > >>
> >>> > >> In the motivation you mention that you want to use DISCONNECT
to
> >>> > >> indicate that the application was killed.
> >>> > >>
> >>> > >> What is the difference to existing state DEAD?
> >>> > >>
> >>> > >> Also, the backing JIRA seems to have a different motivation
to
> add a
> >>> > >> DISCONNECT state. There, the Kafka Streams application itself
is
> >>> > >> healthy, but it cannot connect to the brokers. It seems reasonable
> >>> to
> >>> > >> add a DISCONNECT for this case though.
> >>> > >>
> >>> > >>
> >>> > >>
> >>> > >> -Matthias
> >>> > >>
> >>> > >>
> >>> > >>
> >>> > >> On 4/16/19 9:30 AM, Richard Yu wrote:
> >>> > >> > Hi all,
> >>> > >> >
> >>> > >> > I like to propose a small KIP on adding a new state to
> >>> > >> KafkaStreams#state().
> >>> > >> > It is very simple, so this should pass relatively quickly!
> >>> > >> > Here is the discussion link:
> >>> > >> >
> >>> > >>
> >>> >
> >>>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-457%3A+Add+DISCONNECTED+status+to+Kafka+Streams
> >>> > >> >
> >>> > >> > Cheers,
> >>> > >> > Richard
> >>> > >> >
> >>> > >>
> >>> > >>
> >>> >
> >>>
> >>
>


-- 
-- Guozhang

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message