kafka-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Guozhang Wang <wangg...@gmail.com>
Subject Re: [DISCUSS] KIP 130: Expose states of active tasks to KafkaStreams public API
Date Fri, 21 Apr 2017 20:20:21 GMT
I'd agree with Matthias.

@Eno do you have any specific use case in mind to better keep the
`toString` function?


Guozhang


On Fri, Apr 21, 2017 at 11:41 AM, Matthias J. Sax <matthias@confluent.io>
wrote:

> It would not give the same information as the new API. Thus, it would be
> inconsistent (and this would be really bad IMHO.)
>
>
> I would really like to remove (ie, deprecate) it. It was a "hot fix" to
> give some runtime information to the user. But with this KIP, we get a
> proper first class API and thus .toString() is just outdated.
>
> >>>> It's always preferable to have an implementation for toString method.
>
> Not sure about this btw.
>
> Furthermore, if anyone wants to print runtime information, we got
> ThreadMetadata#toString() and TaskMetadata#toString().
>
>
>
>
> -Matthias
>
> On 4/20/17 9:36 AM, Eno Thereska wrote:
> > Hi there,
> >
> > Can't toString() just stay as is?
> >
> > Thanks
> > Eno
> >
> >> On 20 Apr 2017, at 17:26, Matthias J. Sax <matthias@confluent.io>
> wrote:
> >>
> >> Florian,
> >>
> >> I am just wondering: if we keep .toString(), what should the
> >> implementation look like?
> >>
> >>
> >> -Matthias
> >>
> >> On 4/19/17 2:42 PM, Florian Hussonnois wrote:
> >>> Hi Matthias,
> >>>
> >>> So sorry for the delay in replying to you. For now, I think we can keep
> >>> KafkaStreams#toString() as it is.
> >>> It's always preferable to have an implementation for toString method.
> >>>
> >>> 2017-04-14 4:08 GMT+02:00 Matthias J. Sax <matthias@confluent.io>:
> >>>
> >>>> Florian,
> >>>>
> >>>>>>> What about KafkaStreams#toString() method?
> >>>>>>>
> >>>>>>> I think, we want to deprecate it as with KIP-120 and the changes of
> >>>> this
> >>>>>>> KIP, is gets obsolete.
> >>>>
> >>>> Any thoughts about this? For me, this is the last open point to
> discuss
> >>>> (or what should be reflected in the KIP in case you agree) before I
> can
> >>>> put my vote on the VOTE thread do did start already.
> >>>>
> >>>> -Matthias
> >>>>
> >>>>
> >>>> On 4/11/17 12:18 AM, Damian Guy wrote:
> >>>>> Hi Florian,
> >>>>>
> >>>>> Thanks for the updates. The KIP is looking good.
> >>>>>
> >>>>> Cheers,
> >>>>> Damian
> >>>>>
> >>>>> On Fri, 7 Apr 2017 at 22:41 Matthias J. Sax <matthias@confluent.io>
> >>>> wrote:
> >>>>>
> >>>>>> What about KafkaStreams#toString() method?
> >>>>>>
> >>>>>> I think, we want to deprecate it as with KIP-120 and the changes of
> this
> >>>>>> KIP, is gets obsolete.
> >>>>>>
> >>>>>> If we do so, please update the KIP accordingly.
> >>>>>>
> >>>>>>
> >>>>>> -Matthias
> >>>>>>
> >>>>>> On 3/28/17 7:00 PM, Matthias J. Sax wrote:
> >>>>>>> Thanks for updating the KIP!
> >>>>>>>
> >>>>>>> I think it's good as is -- I would not add anything more to
> >>>> TaskMetadata.
> >>>>>>>
> >>>>>>> About subtopologies and tasks. We do have the concept of
> subtopologies
> >>>>>>> already in KIP-120. It's only missing and ID that allow to link a
> >>>>>>> subtopology to a task.
> >>>>>>>
> >>>>>>> IMHO, adding a simple variable to `Subtopoloy` that provide the id
> >>>>>>> should be sufficient. We can simply document in the JavaDocs how
> >>>>>>> Subtopology and TaskMetadata can be linked to each other.
> >>>>>>>
> >>>>>>> I did update KIP-120 accordingly.
> >>>>>>>
> >>>>>>>
> >>>>>>> -Matthias
> >>>>>>>
> >>>>>>> On 3/28/17 3:45 PM, Florian Hussonnois wrote:
> >>>>>>>> Hi all,
> >>>>>>>>
> >>>>>>>> I've updated the KIP and the PR to reflect your suggestions.
> >>>>>>>>
> >>>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP+
> >>>> 130%3A+Expose+states+of+active+tasks+to+KafkaStreams+public+API
> >>>>>>>> https://github.com/apache/kafka/pull/2612
> >>>>>>>>
> >>>>>>>> Also, I've exposed property StreamThread#state as a string
> through the
> >>>>>>>> new class ThreadMetadata.
> >>>>>>>>
> >>>>>>>> Thanks,
> >>>>>>>>
> >>>>>>>> 2017-03-27 23:40 GMT+02:00 Florian Hussonnois <
> fhussonnois@gmail.com
> >>>>>>>> <mailto:fhussonnois@gmail.com>>:
> >>>>>>>>
> >>>>>>>>    Hi Guozhang, Matthias,
> >>>>>>>>
> >>>>>>>>    It's a great idea to add sub topologies descriptions. This
> would
> >>>>>>>>    help developers to better understand topology concept.
> >>>>>>>>
> >>>>>>>>    I agree that is not really user-friendly to check if
> >>>>>>>>    `StreamsMetadata#streamThreads` is not returning null.
> >>>>>>>>
> >>>>>>>>    The method name localThreadsMetadata looks good. In addition,
> it's
> >>>>>>>>    more simple to build ThreadMetadata instances from the
> >>>> `StreamTask`
> >>>>>>>>    class than from `StreamPartitionAssignor` class.
> >>>>>>>>
> >>>>>>>>    I will work on modifications. As I understand, I have to add
> the
> >>>>>>>>    property subTopologyId property to the TaskMetadata class - Am
> I
> >>>>>> right ?
> >>>>>>>>
> >>>>>>>>    Thanks,
> >>>>>>>>
> >>>>>>>>    2017-03-26 0:25 GMT+01:00 Guozhang Wang <wangguoz@gmail.com
> >>>>>>>>    <mailto:wangguoz@gmail.com>>:
> >>>>>>>>
> >>>>>>>>        Re 1): this is a good point. May be we can move
> >>>>>>>>        `StreamsMetadata#streamThreads` as
> >>>>>>>>        `KafkaStreams#localThreadsMetadata`?
> >>>>>>>>
> >>>>>>>>        3): this is a minor suggestion about function name of
> >>>>>>>>        `assignedPartitions`, to `topicPartitions` to be consistent
> >>>> with
> >>>>>>>>        `StreamsMetadata`?
> >>>>>>>>
> >>>>>>>>
> >>>>>>>>        Guozhang
> >>>>>>>>
> >>>>>>>>        On Thu, Mar 23, 2017 at 4:30 PM, Matthias J. Sax
> >>>>>>>>        <matthias@confluent.io <mailto:matthias@confluent.io>>
> wrote:
> >>>>>>>>
> >>>>>>>>            Thanks for the progress on this KIP. I think we are on
> the
> >>>>>>>>            right path!
> >>>>>>>>
> >>>>>>>>            Couple of comments/questions:
> >>>>>>>>
> >>>>>>>>            (1) Why do we not consider the "rejected alternative"
> to
> >>>> add
> >>>>>>>>            the method
> >>>>>>>>            to KafkaStreams? The comment on #streamThreads() says:
> >>>>>>>>
> >>>>>>>>            "Note this method will return <code>null</code> if
> called
> >>>> on
> >>>>>>>>            {@link
> >>>>>>>>            StreamsMetadata} which represent a remote application."
> >>>>>>>>
> >>>>>>>>            Thus, if we cannot get any remote metadata, it seems
> not
> >>>>>>>>            straight
> >>>>>>>>            forward to not add it to KafkaStreams directly -- this
> >>>> would
> >>>>>>>>            avoid
> >>>>>>>>            invalid calls and `null` return value in the first
> place.
> >>>>>>>>
> >>>>>>>>            I like the idea about exposing sub-topologies.:
> >>>>>>>>
> >>>>>>>>            (2a) I would recommend to rename `topicsGroupId` to
> >>>>>>>>            `subTopologyId` :)
> >>>>>>>>
> >>>>>>>>            (2b) We could add this to KIP-120 already. However, I
> >>>> would
> >>>>>>>>            not just
> >>>>>>>>            link both via name, but leverage KIP-120 directly, and
> >>>> add a
> >>>>>>>>            "Subtopology" member to the TaskMetadata class.
> >>>>>>>>
> >>>>>>>>
> >>>>>>>>            Overall, I like the distinction of KIP-120 only
> exposing
> >>>>>>>>            "static"
> >>>>>>>>            information that can be determined before the topology
> >>>> get's
> >>>>>>>>            started,
> >>>>>>>>            while this KIP allow to access runtime information.
> >>>>>>>>
> >>>>>>>>
> >>>>>>>>
> >>>>>>>>            -Matthias
> >>>>>>>>
> >>>>>>>>
> >>>>>>>>            On 3/22/17 12:42 PM, Guozhang Wang wrote:
> >>>>>>>>> Thanks for the updated KIP, and sorry for the late
> >>>>>> replies!
> >>>>>>>>>
> >>>>>>>>> I think a little bit more about KIP-130, and I feel that
> >>>>>>>>            if we are going
> >>>>>>>>> to deprecate the `toString` function (it is not
> >>>> explicitly
> >>>>>>>>            said in the
> >>>>>>>>> KIP, so I'm not sure if you plan to still keep the
> >>>>>>>>> `KafkaStreams#toString` as is or are going to replace it
> >>>>>>>>            with the
> >>>>>>>>> proposed APIs) with the proposed ones, it may be okay.
> >>>>>> More
> >>>>>>>>> specifically, after both KIP-120 and KIP-130:
> >>>>>>>>>
> >>>>>>>>> 1. users can use `#describe` function to check the
> >>>>>>>>            generated topology
> >>>>>>>>> before calling `KafkaStreams#start`, which is static
> >>>>>>>>            information.
> >>>>>>>>> 2. users can use the `StreamsMetadata -> ThreadMetadata
> >>>> ->
> >>>>>>>>            TaskMetadata`
> >>>>>>>>> programmatically after called `KafkaStreams#start` to
> >>>> get
> >>>>>> the
> >>>>>>>>> dynamically changeable information.
> >>>>>>>>>
> >>>>>>>>> One thing I'm still not sure though, is that in
> >>>>>>>>            `TaskMetadata` we only
> >>>>>>>>> have the TaskId and assigned partitions, whereas in
> >>>>>>>>> "TopologyDescription" introduced in KIP-120, it will
> >>>>>>>>            simply describe the
> >>>>>>>>> whole topology possibly composed of multiple
> >>>>>>>>            sub-topologies. So it is
> >>>>>>>>> hard for users to tell which sub-topology is executed
> >>>>>>>>            under which task
> >>>>>>>>> on-the-fly.
> >>>>>>>>>
> >>>>>>>>> Hence I'm thinking if we can expose the
> >>>> "sub-topology-id"
> >>>>>>>>            (named as
> >>>>>>>>> topicsGroupId internally) in
> >>>>>>>>            TopologyDescription#Subtopology, and then
> >>>>>>>>> from the task id which is essentially "sub-topology-id
> >>>>>> DASH
> >>>>>>>>> partition-group-id" users can make the link, though it
> >>>> is
> >>>>>>>>            still not that
> >>>>>>>>> straight-forward.
> >>>>>>>>>
> >>>>>>>>> Thoughts?
> >>>>>>>>>
> >>>>>>>>> Guozhang
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>> On Wed, Mar 15, 2017 at 3:16 PM, Florian Hussonnois
> >>>>>>>>> <fhussonnois@gmail.com <mailto:fhussonnois@gmail.com>
> >>>>>>>>            <mailto:fhussonnois@gmail.com
> >>>>>>>>            <mailto:fhussonnois@gmail.com>>> wrote:
> >>>>>>>>>
> >>>>>>>>>    Thanks Guozhang for pointing me to the KIP-120.
> >>>>>>>>>
> >>>>>>>>>    I've made some modifications to the KIP. I also
> >>>>>> proposed a new PR
> >>>>>>>>>    (there is
> >>>>>>>>>    still some tests to make).
> >>>>>>>>>
> >>>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP+
> >>>> 130%3A+Expose+states+of+active+tasks+to+KafkaStreams+public+API
> >>>>>>>>            <
> >>>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP+
> >>>> 130%3A+Expose+states+of+active+tasks+to+KafkaStreams+public+API
> >>>>>>>
> >>>>>>>>>    <
> >>>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP+
> >>>> 130%3A+Expose+states+of+active+tasks+to+KafkaStreams+public+API
> >>>>>>>>            <
> >>>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP+
> >>>> 130%3A+Expose+states+of+active+tasks+to+KafkaStreams+public+API
> >>>>>>>>
> >>>>>>>>>
> >>>>>>>>>    Exposing consumed offsets through JMX is sufficient
> >>>>>> for debugging
> >>>>>>>>>    purpose.
> >>>>>>>>>    But I think this could be part to another JIRA as
> >>>>>> there is no impact to
> >>>>>>>>>    public API.
> >>>>>>>>>
> >>>>>>>>>    Thanks
> >>>>>>>>>
> >>>>>>>>>    2017-03-10 22:35 GMT+01:00 Guozhang Wang <
> >>>>>> wangguoz@gmail.com <mailto:wangguoz@gmail.com>
> >>>>>>>>>    <mailto:wangguoz@gmail.com <mailto:
> >>>> wangguoz@gmail.com
> >>>>>>>>> :
> >>>>>>>>
> >>>>>>>>>
> >>>>>>>>>> Hello Florian,
> >>>>>>>>>>
> >>>>>>>>>> As for programmatically discover monitoring data
> >>>> by
> >>>>>>>>            piping metrics
> >>>>>>>>>    into a
> >>>>>>>>>> dedicated topic. I think you can actually use a
> >>>>>>>>>    KafkaMetricsReporter which
> >>>>>>>>>> pipes the polled metric values into a pre-defined
> >>>>>>>>            topic (note that
> >>>>>>>>>    in Kafka
> >>>>>>>>>> the MetricsReporter is simply an interface and
> >>>> users
> >>>>>>>>            can build
> >>>>>>>>>    their own
> >>>>>>>>>> impl in addition to the JMXReporter), for example
> >>>> :
> >>>>>>>>>>
> >>>>>>>>>> https://github.com/krux/kafka-metrics-reporter
> >>>>>>>>            <https://github.com/krux/kafka-metrics-reporter>
> >>>>>>>>>    <https://github.com/krux/kafka-metrics-reporter
> >>>>>>>>            <https://github.com/krux/kafka-metrics-reporter>>
> >>>>>>>>>>
> >>>>>>>>>> As for the "static task-level assignment", what I
> >>>>>>>>            meant is that
> >>>>>>>>>    the mapping
> >>>>>>>>>> from source-topic-partitions -> tasks are static,
> >>>>>>>>            via the
> >>>>>>>>>> "PartitionGrouper", and a task won't switch from
> >>>> an
> >>>>>>>>            active task to a
> >>>>>>>>>> standby task, it is actually that an active task
> >>>>>>>>            could be
> >>>>>>>>>    migrated, as a
> >>>>>>>>>> whole along with all its assigned partitions, to
> >>>>>>>>            another thread /
> >>>>>>>>>    process
> >>>>>>>>>> and a new standby task will be created on the host
> >>>>>>>>            that this
> >>>>>>>>>    active task is
> >>>>>>>>>> migrating from. So for the SAME task, its
> >>>>>> taskMetadata.
> >>>>>>>>>> assignedPartitions()
> >>>>>>>>>> will always return you the same partitions.
> >>>>>>>>>>
> >>>>>>>>>> As for the `toString` function that what we have
> >>>>>>>>            today, I feel it
> >>>>>>>>>    has some
> >>>>>>>>>> correlations with KIP-120 so I'm trying to
> >>>>>>>>            coordinate some
> >>>>>>>>>    discussions here
> >>>>>>>>>> (cc'ing Matthias as the owner of KIP-120). My
> >>>>>>>>            understand is that:
> >>>>>>>>>>
> >>>>>>>>>> 1. In KIP-120, the `toString` function of
> >>>>>>>>            `KafkaStreams` will be
> >>>>>>>>>    removed
> >>>>>>>>>> and instead the `Topology#describe` function will
> >>>> be
> >>>>>>>>            introduced
> >>>>>>>>>    for users
> >>>>>>>>>> to debug the topology BEFORE start running their
> >>>>>>>>            instance with the
> >>>>>>>>>> topology. And hence the description won't contain
> >>>>>>>>            any task
> >>>>>>>>>    information as
> >>>>>>>>>> they are not formed yet.
> >>>>>>>>>> 2. In KIP-130, we want to add the task-level
> >>>>>>>>            information for
> >>>>>>>>>    monitoring
> >>>>>>>>>> purposes, which is not static and can only be
> >>>>>>>>            captured AFTER the
> >>>>>>>>>    instance
> >>>>>>>>>> has started running. Again I'm wondering for
> >>>> KIP-130
> >>>>>>>>            alone if
> >>>>>>>>>    adding those
> >>>>>>>>>> metrics mentioned in my previous email would
> >>>> suffice
> >>>>>>>>            even for the
> >>>>>>>>>    use case
> >>>>>>>>>> that you have mentioned.
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>> Guozhang
> >>>>>>>>>>
> >>>>>>>>>> On Wed, Mar 8, 2017 at 3:18 PM, Florian Hussonnois
> >>>>>>>>>    <fhussonnois@gmail.com <mailto:
> >>>> fhussonnois@gmail.com>
> >>>>>>>>            <mailto:fhussonnois@gmail.com <mailto:
> >>>> fhussonnois@gmail.com
> >>>>>>>>>
> >>>>>>>>
> >>>>>>>>>> wrote:
> >>>>>>>>>>
> >>>>>>>>>>> Hi Guozhang
> >>>>>>>>>>>
> >>>>>>>>>>> Thank you for your feedback. I've started to
> >>>> look
> >>>>>>>>            more deeply
> >>>>>>>>>    into the
> >>>>>>>>>>> code. As you mention, it would be more clever to
> >>>>>>>>            use the current
> >>>>>>>>>>> StreamMetadata API to expose these information.
> >>>>>>>>>>>
> >>>>>>>>>>> I think exposing metrics through JMX is great
> >>>> for
> >>>>>>>>            building
> >>>>>>>>>    monitoring
> >>>>>>>>>>> dashboards using some tools like jmxtrans and
> >>>>>> grafana.
> >>>>>>>>>>> But for our use case we would like to expose the
> >>>>>>>>            states
> >>>>>>>>>    directely from
> >>>>>>>>>> the
> >>>>>>>>>>> application embedding the kstreams topologies.
> >>>>>>>>>>> So we expect to be able to retrieve states in a
> >>>>>>>>            programmatic way.
> >>>>>>>>>>>
> >>>>>>>>>>> For instance, we could imagin to produce those
> >>>>>>>>            states into a
> >>>>>>>>>    dedicated
> >>>>>>>>>>> topic. In that way a third application could
> >>>>>>>>            automatically
> >>>>>>>>>    discover all
> >>>>>>>>>>> kafka-streams applications which could be
> >>>>>> monitored.
> >>>>>>>>>>> In production environment, that can be clearly a
> >>>>>>>>            solution to have a
> >>>>>>>>>>> complete overview of a microservices
> >>>> architecture
> >>>>>>>>            based on Kafka
> >>>>>>>>>    Streams.
> >>>>>>>>>>>
> >>>>>>>>>>> The toString() method give a lots of information
> >>>>>>>>            it can only be
> >>>>>>>>>    used for
> >>>>>>>>>>> debugging purpose but not to build a topologies
> >>>>>>>>            visualization
> >>>>>>>>>    tool. We
> >>>>>>>>>>> could actually expose same details about the
> >>>>>>>>            stream topology
> >>>>>>>>>    from the
> >>>>>>>>>>> StreamMetadata API ? So the TaskMetadata class
> >>>> you
> >>>>>>>>            have
> >>>>>>>>>    suggested could
> >>>>>>>>>>> contains similar information that ones return by
> >>>>>>>>            the toString
> >>>>>>>>>    method from
> >>>>>>>>>>> AbstractTask class ?
> >>>>>>>>>>>
> >>>>>>>>>>> I can update the KIP in that way.
> >>>>>>>>>>>
> >>>>>>>>>>> Finally,  I'm not sure to understand your last
> >>>>>>>>            point :* "Note
> >>>>>>>>>    that the
> >>>>>>>>>>> task-level assignment information is static,
> >>>> i.e.
> >>>>>>>>            it will not change
> >>>>>>>>>> during
> >>>>>>>>>>> the runtime" *
> >>>>>>>>>>>
> >>>>>>>>>>> Does that mean when a rebalance occurs new tasks
> >>>>>>>>            are created for
> >>>>>>>>>    the new
> >>>>>>>>>>> assignments and old ones just switch to a
> >>>> standby
> >>>>>>>>            state ?
> >>>>>>>>>>>
> >>>>>>>>>>> Thanks,
> >>>>>>>>>>>
> >>>>>>>>>>> 2017-03-05 7:04 GMT+01:00 Guozhang Wang
> >>>>>>>>            <wangguoz@gmail.com <mailto:wangguoz@gmail.com>
> >>>>>>>>>    <mailto:wangguoz@gmail.com <mailto:
> >>>> wangguoz@gmail.com
> >>>>>>>>> :
> >>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>>> Hello Florian,
> >>>>>>>>>>>>
> >>>>>>>>>>>> Thanks for the KIP and your detailed
> >>>> explanation
> >>>>>>>>            of your use
> >>>>>>>>>    case. I
> >>>>>>>>>>> think
> >>>>>>>>>>>> there are two dimensions to discuss on how to
> >>>>>>>>            improve Streams'
> >>>>>>>>>>>> debuggability (or more specifically state
> >>>>>>>>            exposure for
> >>>>>>>>>    visualization).
> >>>>>>>>>>>>
> >>>>>>>>>>>> First question is "what information should we
> >>>>>>>>            expose to the
> >>>>>>>>>    user". From
> >>>>>>>>>>>> your KIP I saw generally three categories:
> >>>>>>>>>>>>
> >>>>>>>>>>>> 1. The state of the thread within a process,
> >>>> as
> >>>>>>>>            you mentioned
> >>>>>>>>>    currently
> >>>>>>>>>>> we
> >>>>>>>>>>>> only expose the state of the process but not
> >>>> the
> >>>>>>>>            finer grained
> >>>>>>>>>> per-thread
> >>>>>>>>>>>> state.
> >>>>>>>>>>>> 2. The state of the task. Currently the most
> >>>>>>>>            close API to this is
> >>>>>>>>>>>> StreamsMetadata,
> >>>>>>>>>>>> however it aggregates the tasks across all
> >>>>>>>>            threads and only
> >>>>>>>>>    present the
> >>>>>>>>>>>> aggregated set of the assigned partitions /
> >>>>>>>>            state stores etc.
> >>>>>>>>>    We can
> >>>>>>>>>>>> consider extending this method to have a new
> >>>>>>>>>    StreamsMetadata#tasks()
> >>>>>>>>>>> which
> >>>>>>>>>>>> returns a TaskMetadata with the similar
> >>>> fields,
> >>>>>>>>            and the
> >>>>>>>>>>>> StreamsMetadata.stateStoreNames / etc would
> >>>>>>>>            still be returning the
> >>>>>>>>>>>> aggregated results but users can still "drill
> >>>>>>>>            down" if they want.
> >>>>>>>>>>>>
> >>>>>>>>>>>> The second question is "how should we expose
> >>>>>>>>            them to the
> >>>>>>>>>    user". For
> >>>>>>>>>>>> example, you mentioned about
> >>>>>>>>            consumedOffsetsByPartition in the
> >>>>>>>>>>> activeTasks.
> >>>>>>>>>>>> We could add this as a JMX metric based on
> >>>> fetch
> >>>>>>>>            positions
> >>>>>>>>>    inside the
> >>>>>>>>>>>> consumer layer (note that Streams is just
> >>>>>>>>            embedding consumers)
> >>>>>>>>>    or we
> >>>>>>>>>>> could
> >>>>>>>>>>>> consider adding it into TaskMetadata. Either
> >>>>>>>>            case it can be
> >>>>>>>>>    visualized
> >>>>>>>>>>> for
> >>>>>>>>>>>> monitoring. The reason we expose
> >>>> StreamsMetadata
> >>>>>>>>            as well as
> >>>>>>>>>    State was
> >>>>>>>>>>> that
> >>>>>>>>>>>> it is expected to be "polled" in a
> >>>> programmatic
> >>>>>>>>            way for
> >>>>>>>>>    interactive
> >>>>>>>>>>> queries
> >>>>>>>>>>>> and also for control flows (e.g. I would like
> >>>> to
> >>>>>>>>            ONLY start
> >>>>>>>>>    running my
> >>>>>>>>>>>> other topology until the first topology has
> >>>> been
> >>>>>>>>            up and
> >>>>>>>>>    running) while
> >>>>>>>>>>> for
> >>>>>>>>>>>> your case it seems the main purpose is to
> >>>>>>>>            continuously query
> >>>>>>>>>    them for
> >>>>>>>>>>>> monitoring etc. Personally I'd prefer to
> >>>> expose
> >>>>>>>>            them as JMX
> >>>>>>>>>    only for
> >>>>>>>>>> such
> >>>>>>>>>>>> purposes only to have a simpler API.
> >>>>>>>>>>>>
> >>>>>>>>>>>> So given your current motivations I'd suggest
> >>>>>>>>            expose the following
> >>>>>>>>>>>> information as newly added metrics in Streams:
> >>>>>>>>>>>>
> >>>>>>>>>>>> 1. Thread-level state metric.
> >>>>>>>>>>>> 2. Task-level hosted client identifier metric
> >>>>>>>>            (e.g. host:port).
> >>>>>>>>>>>> 3. Consumer-level per-topic/partition position
> >>>>>>>>            metric (
> >>>>>>>>>>>>
> >>>>>>>>
> >>>>>> https://kafka.apache.org/documentation/#topic_fetch_monitoring
> >>>>>>>>            <
> >>>>>> https://kafka.apache.org/documentation/#topic_fetch_monitoring>
> >>>>>>>>>
> >>>>>>>>             <
> >>>>>> https://kafka.apache.org/documentation/#topic_fetch_monitoring
> >>>>>>>>            <
> >>>>>> https://kafka.apache.org/documentation/#topic_fetch_monitoring>>).
> >>>>>>>>>>>>
> >>>>>>>>>>>> Note that the task-level assignment
> >>>> information
> >>>>>> is static,
> >>>>>>>>>    i.e. it will
> >>>>>>>>>>> not
> >>>>>>>>>>>> change during the runtime at all and can be
> >>>>>> accessed from the
> >>>>>>>>>>> `toString()`
> >>>>>>>>>>>> function already even before the instance
> >>>> start
> >>>>>> running, so I
> >>>>>>>>>    think
> >>>>>>>>>> this
> >>>>>>>>>>>> piece of information do not need to be exposed
> >>>>>> through JMX
> >>>>>>>>>    anymore.
> >>>>>>>>>>>>
> >>>>>>>>>>>> WDYT?
> >>>>>>>>>>>>
> >>>>>>>>>>>> Guozhang
> >>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>>> On Thu, Mar 2, 2017 at 3:11 AM, Damian Guy
> >>>>>>>>>    <damian.guy@gmail.com <mailto:damian.guy@gmail.com>
> >>>>>>>>            <mailto:damian.guy@gmail.com <mailto:
> damian.guy@gmail.com
> >>>>>>>>>
> >>>>>>>>>> wrote:
> >>>>>>>>>>>>
> >>>>>>>>>>>>> Hi Florian,
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> Thanks for the KIP.
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> It seems there is some overlap here with
> >>>> what
> >>>>>> we already have in
> >>>>>>>>>>>>> KafkaStreams.allMetadata(). This currently
> >>>>>> returns a
> >>>>>>>>>>>>> Collection<StreamsMetadata> where each
> >>>>>> StreamsMetadata
> >>>>>>>>>    instance holds
> >>>>>>>>>>> the
> >>>>>>>>>>>>> state stores and partition assignment for
> >>>>>> every instance of the
> >>>>>>>>>>>>> KafkaStreams application. I'm wondering if
> >>>>>> that is good
> >>>>>>>>>    enough for
> >>>>>>>>>> what
> >>>>>>>>>>>> you
> >>>>>>>>>>>>> are trying to achieve? If not could it be
> >>>>>> modified to
> >>>>>>>>>    include the per
> >>>>>>>>>>>>> Thread assignment?
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> Thanks,
> >>>>>>>>>>>>> Damian
> >>>>>>>>>>>>>
> >>>>>>>>>>>>>
> >>>>>>>>>>>>>
> >>>>>>>>>>>>>
> >>>>>>>>>>>>>
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> On Wed, 1 Mar 2017 at 22:49 Florian
> >>>>>> Hussonnois <
> >>>>>>>>>> fhussonnois@gmail.com <mailto:
> >>>> fhussonnois@gmail.com
> >>>>>>>
> >>>>>>>>            <mailto:fhussonnois@gmail.com <mailto:
> >>>> fhussonnois@gmail.com
> >>>>>>>>>
> >>>>>>>>
> >>>>>>>>>>>>> wrote:
> >>>>>>>>>>>>>
> >>>>>>>>>>>>>> Hi Matthias,
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> First, I will answer to your last
> >>>> question.
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> The main reason to have both
> >>>>>>>>            TaskState#assignment and
> >>>>>>>>>>>>>> TaskState#consumedOffsetsByPartition is
> >>>> that
> >>>>>>>>            tasks have no
> >>>>>>>>>> consumed
> >>>>>>>>>>>>> offsets
> >>>>>>>>>>>>>> until at least one message is consumed for
> >>>>>>>>            each partition
> >>>>>>>>>    even if
> >>>>>>>>>>>>> previous
> >>>>>>>>>>>>>> offsets exist for the consumer group.
> >>>>>>>>>>>>>> So yes this methods are redundant as it
> >>>> only
> >>>>>>>>            diverge at
> >>>>>>>>>    application
> >>>>>>>>>>>>>> startup.
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> About the use case, currently we are
> >>>>>>>>            developping for a
> >>>>>>>>>    customer a
> >>>>>>>>>>>> little
> >>>>>>>>>>>>>> framework based on KafkaStreams which
> >>>>>>>>>    transform/denormalize data
> >>>>>>>>>>> before
> >>>>>>>>>>>>>> ingesting into hadoop.
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> We have a cluster of workers (SpringBoot)
> >>>>>>>>            which instantiate
> >>>>>>>>>> KStreams
> >>>>>>>>>>>>>> topologies dynamicaly based on dataflow
> >>>>>>>>            configurations.
> >>>>>>>>>>>>>> Each configuration describes a topic to
> >>>>>>>>            consume and how to
> >>>>>>>>>    process
> >>>>>>>>>>>>> messages
> >>>>>>>>>>>>>> (this looks like NiFi processors API).
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> Our architecture is inspired from
> >>>>>>>>            KafkaConnect. We have
> >>>>>>>>>    topics for
> >>>>>>>>>>>>> configs
> >>>>>>>>>>>>>> and states which are consumed by each
> >>>>>>>>            workers (actually we
> >>>>>>>>>    have
> >>>>>>>>>>> reused
> >>>>>>>>>>>>> some
> >>>>>>>>>>>>>> internals classes to the connect API).
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> Now, we would like to develop UIs to
> >>>>>>>>            visualize topics and
> >>>>>>>>>> partitions
> >>>>>>>>>>>>>> consumed by our worker applications.
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> Also, I think it would be nice to be able,
> >>>>>>>>            in the futur, to
> >>>>>>>>>> develop
> >>>>>>>>>>>> web
> >>>>>>>>>>>>>> UIs similar to Spark but for KafkaStreams
> >>>> to
> >>>>>>>>            visualize
> >>>>>>>>>    DAGs...so
> >>>>>>>>>>> maybe
> >>>>>>>>>>>>> this
> >>>>>>>>>>>>>> KIP is just a first step.
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> Thanks,
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> 2017-03-01 22:52 GMT+01:00 Matthias J. Sax
> >>>>>>>>>    <matthias@confluent.io <mailto:
> >>>> matthias@confluent.io>
> >>>>>>>>            <mailto:matthias@confluent.io <mailto:
> >>>> matthias@confluent.io
> >>>>>>>>
> >>>>>>>>
> >>>>>>>>>>> :
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> Thanks for the KIP.
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> I am wondering a little bit, why you
> >>>> need
> >>>>>>>>            to expose this
> >>>>>>>>>>> information.
> >>>>>>>>>>>>>>> Can you describe some use cases?
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> Would it be worth to unify this new API
> >>>>>> with
> >>>>>>>>>    KafkaStreams#state()
> >>>>>>>>>>> to
> >>>>>>>>>>>>> get
> >>>>>>>>>>>>>>> the overall state of an application
> >>>>>>>>            without the need to
> >>>>>>>>>    call two
> >>>>>>>>>>>>>>> different methods? Not sure how this
> >>>>>>>>            unified API might
> >>>>>>>>>    look like
> >>>>>>>>>>>>> though.
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> One minor comment about the API:
> >>>>>>>>            TaskState#assignment
> >>>>>>>>>    seems to be
> >>>>>>>>>>>>>>> redundant. It should be the same as
> >>>>>>>>>>>>>>>
> >>>>>> TaskState#consumedOffsetsByPartition.keySet()
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> Or do I miss something?
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> -Matthias
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> On 3/1/17 5:19 AM, Florian Hussonnois
> >>>>>> wrote:
> >>>>>>>>>>>>>>>> Hi Eno,
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> Yes, but the state() method only
> >>>> returns
> >>>>>>>>            the global
> >>>>>>>>>    state of
> >>>>>>>>>> the
> >>>>>>>>>>>>>>>> KafkaStream application (ie: CREATED,
> >>>>>>>>            RUNNING,
> >>>>>>>>>    REBALANCING,
> >>>>>>>>>>>>>>>> PENDING_SHUTDOWN, NOT_RUNNING).
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> An alternative to this KIP would be to
> >>>>>>>>            change this
> >>>>>>>>>    method to
> >>>>>>>>>>> return
> >>>>>>>>>>>>>> more
> >>>>>>>>>>>>>>>> information instead of adding a new
> >>>>>> method.
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> 2017-03-01 13:46 GMT+01:00 Eno
> >>>> Thereska
> >>>>>> <
> >>>>>>>>>> eno.thereska@gmail.com
> >>>>>>>>            <mailto:eno.thereska@gmail.com>
> >>>>>>>>            <mailto:eno.thereska@gmail.com <mailto:
> >>>>>> eno.thereska@gmail.com>>
> >>>>>>>>>>>> :
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> Thanks Florian,
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> Have you had a chance to look at the
> >>>>>> new state methods in
> >>>>>>>>>>> 0.10.2,
> >>>>>>>>>>>>>> e.g.,
> >>>>>>>>>>>>>>>>> KafkaStreams.state()?
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> Thanks
> >>>>>>>>>>>>>>>>> Eno
> >>>>>>>>>>>>>>>>>> On 1 Mar 2017, at 11:54, Florian
> >>>>>> Hussonnois <
> >>>>>>>>>>>> fhussonnois@gmail.com
> >>>>>>>>            <mailto:fhussonnois@gmail.com> <mailto:
> >>>>>> fhussonnois@gmail.com
> >>>>>>>>            <mailto:fhussonnois@gmail.com>>
> >>>>>>>>
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> wrote:
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> Hi all,
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> I have just created KIP-130 to add a
> >>>>>>>>            new method to the
> >>>>>>>>>>>> KafkaStreams
> >>>>>>>>>>>>>> API
> >>>>>>>>>>>>>>>>> in
> >>>>>>>>>>>>>>>>>> order to expose the states of
> >>>> threads
> >>>>>>>>            and active tasks.
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>
> >>>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP+
> >>>>>>>>            <https://cwiki.apache.org/
> confluence/display/KAFKA/KIP+>
> >>>>>>>>>
> >>>>>>>>             <https://cwiki.apache.org/
> confluence/display/KAFKA/KIP+
> >>>>>>>>            <https://cwiki.apache.org/
> confluence/display/KAFKA/KIP+>>
> >>>>>>>>>>>>>>>>>
> >>>>>>>>            130%3A+Expose+states+of+active+tasks+to+KafkaStreams+
> >>>>>>>>>> public+API
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> Thanks,
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> --
> >>>>>>>>>>>>>>>>>> Florian HUSSONNOIS
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> --
> >>>>>>>>>>>>>> Florian HUSSONNOIS
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>>> --
> >>>>>>>>>>>> -- Guozhang
> >>>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>> --
> >>>>>>>>>>> Florian HUSSONNOIS
> >>>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>> --
> >>>>>>>>>> -- Guozhang
> >>>>>>>>>>
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>>    --
> >>>>>>>>>    Florian HUSSONNOIS
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>> --
> >>>>>>>>> -- Guozhang
> >>>>>>>>
> >>>>>>>>
> >>>>>>>>
> >>>>>>>>
> >>>>>>>>        --
> >>>>>>>>        -- Guozhang
> >>>>>>>>
> >>>>>>>>
> >>>>>>>>
> >>>>>>>>
> >>>>>>>>    --
> >>>>>>>>    Florian HUSSONNOIS
> >>>>>>>>
> >>>>>>>>
> >>>>>>>>
> >>>>>>>>
> >>>>>>>> --
> >>>>>>>> Florian HUSSONNOIS
> >>>>>>>
> >>>>>>
> >>>>>>
> >>>>>
> >>>>
> >>>>
> >>>
> >>>
> >>
> >
>
>


-- 
-- Guozhang

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message