kafka-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Adam Bellemare <adam.bellem...@gmail.com>
Subject Re: KIP-382 + Kafka Streams Question
Date Tue, 23 Jul 2019 18:03:39 GMT
Hi Ryanne

Thanks for the clarifications! Here is one of my own, as I think it's the
biggest stumbling block in my description:

*> What is "table" exactly? I am interpreting this as a KTable changelog
topic*
"table" is not a KTable changelog topic, but simply entity data that is to
be materialized into a table - for example, relational data captured from
Kafka Connect. I should have named this "stateful-data" or something less
ambiguous and provided an explicit definition. Note that non-KStreams
applications will also regularly use this entity data to materialize their
own tables, but it in itself is not a KTable internal changelog.

Per your example 1, let's name this topic "userEntity". It could be a
(key,value) pair of (userId, emailAddress), where I only want the latest
emailAddress (basic materialization) to send an email on account password
update. I only want to run the application against one Kafka cluster, and
because I don't want to use dual-ingest, I am running that application only
on the cluster where the data is being sent (Primary Cluster). In a
scenario where all replication is working correctly I could also run this
off the Secondary cluster's replica, "primary.userEntity"



*> Yes, that's something like "dual ingest", which I would not recommend.*
Agreed. I do not want to use dual ingest.

*> Secondary cluster:*
*> Topics: events, primary.events, table-changelog*
*> App subscription: events, primary.events*
*> App output: table-changelog*

Is the "events" topic dual ingest, since it exists in the Primary cluster
with the exact same name?

The whole scenario can be boiled down into the following:
1) Entity data is in a userEntity topic, ie: (userId, emailAddress)
2) I want to publish it into an Active-Active cluster setup without using
dual-ingest
3) I want to materialize the data into a single table for an application
consuming from a single cluster (Kafka Streams or not)
4) I want to be able to fail over and rebuild the materialized state using
the data I have replicated.
- If all of the entity data is produced to each cluster (dual-ingest) than
it is trivial to fail over and rebuild the materialized table.
- If the data is only produced to Primary and only replicated to Secondary,
at a failover I would need to consume from the replicated topic.
*    Q1) Where does the producer write its data to if the primary cluster
is dead?*
        It seems to me that it must then write its data to the only
remaining cluster. This would then put the entity data in two topics as I
had originally outlined, as below:
*        Secondary Cluster: (Live)   (renamed table to userEntity)*
          Topic: "primary.userEntity" (contains data from T = 0 to T = n)
          Topic: "userEntity" (contains data from T = n+1 to now, the
failed-over producer)


*    Q2) How does a Kafka Streams application materialize state from two
topics? (loaded question, I know)*
          Since I know this isn't built in, is there some sort of technique
or system that you use to allow for a single virtual topic made up of many
logical topics?

*Q3) Do you have any recommendations on how to handle replication/producing
of entity-data (ie: userEntity) across multiple clusters, such that an
application may correctly (or even near-correctly) materialize state after
a failover like the one I described above?*
This is really the golden question. We're currently developing our
Active-Passive approach, but we want to be prepared for scenarios where we
have multiple clusters with entity-replication between clusters.


Thanks Ryanne!


On Tue, Jul 23, 2019 at 12:39 PM Ryanne Dolan <ryannedolan@gmail.com> wrote:

> Adam,
>
> > I think we have inconsistent definitions of Active-Active
>
> Yes, this terminology gets thrown around a lot. IMO "active" means both
> producers and consumers are using a cluster under normal operation -- not
> just during outages, and not just by something like MM2. (Obviously, MM2
> has producers and consumers, but they don't count here.) Conversely,
> "standby" or "backup" means that data is being written by a producer, but
> it isn't being consumed under normal operation. I qualify this definition
> with IMO, as I don't think there is strong consensus here.
>
> I'll also add a caveat about "under normal operation". An active/active
> architecture does not necessarily mean that you use both clusters in the
> same way all the time -- only that you _could_. You could load-balance
> 50/50 of your traffic between two clusters, or you could direct 100% to one
> and 0% to the other, e.g. if one is farther away or has less hw resources.
> But the architecture remains the same (and certainly, MM2 doesn't care
> about this detail).
>
> > The producer is only producing to one cluster (primary) and one topic
> (topic "table"), and the other cluster (secondary) contains only a
> replication of the data via MM2 ("primary.table").
>
> That, by definition, is not active/active.
>
> >What you seemed to be proposing is that the producer's "table" data is
> sent fully to each cluster, such that the state can be materialized as a
> KTable in each application running on each cluster.
>
> Correct.
>
> > This wouldn't require MM2 at all, so I'm not sure if this is what you
> advocated.
>
> You could use a dual-ingest method and send all your data to both
> clusters, which would not require MM2. There are many issues with this
> approach, primarily wrt to consistency and efficiency.
>
> > The trivial solution seems to be to make your producers produce all
> stateful data (topic "table") to each cluster, which makes MM2 unnecessary,
> but can also lead to data inconsistencies so it's not exactly foolproof.
>
> Yes, that's something like "dual ingest", which I would not recommend.
>
> > SteamsAppPrimary is consuming from ("table")
>
> What is "table" exactly? I am interpreting this as a KTable changelog
> topic, in which case "table" is an output topic of some streams app, i.e.
> the app producing the change events. _This_ is the app I mean to suggest
> you run on both clusters. Then, "table" will appear on both clusters (no
> "primary.table").
>
> The app that is creating the "table" changelog would be processing events
> from some other topic, say "events". Then, this is what I recommend:
>
> Primary cluster:
> Topics: events, secondary.events, table-changelog
> App subscription: events, secondary.events
> App output: table-changelog
>
> Secondary cluster:
> Topics: events, primary.events, table-changelog
> App subscription: events, primary.events
> App output: table-changelog
>
> With this arrangement, the app on either cluster will have built up state
> in RocksDB based on events from both clusters.
>
> Now, it seems you also want a second app to process this changelog. I can
> see a few scenarios:
>
> 1) you want to take some external action based on records in the table
> changelog, e.g. to send an email every time a password is updated. In this
> case, you don't want this app running in both clusters, as you'd get two
> emails. So you could run it in one cluster and use offset translation to
> migrate during failover. The send-email app is stateless, so you just need
> to translate and reset offsets (there is no internal state to rebuild).
>
> 2) you want to use the table changelog in a stateful but non-effecting
> way, e.g. by keeping a running count of records. This app, like the first,
> can be run in both clusters.
>
> 3) you want some combination of state and external actions in one big app.
> In this case, I'd consider splitting your app in two so that you can built
> state in both clusters while effecting external actions in only one cluster
> at a time.
>
> Lemme know if that makes sense.
>
> Ryanne
>
> On Tue, Jul 23, 2019 at 10:19 AM Adam Bellemare <adam.bellemare@gmail.com>
> wrote:
>
>> Hi Ryanne
>>
>> I think we have inconsistent definitions of Active-Active. The producer
>> is only producing to one cluster (primary) and one topic (topic "table"),
>> and the other cluster (secondary) contains only a replication of the data
>> via MM2 ("primary.table"). What you seemed to be proposing is that the
>> producer's "table" data is sent fully to each cluster, such that the state
>> can be materialized as a KTable in each application running on each
>> cluster. This wouldn't require MM2 at all, so I'm not sure if this is what
>> you advocated.
>>
>> You also state that "As with normal consumers, the Streams app should *subscribe
>> to any remote topics*, e.g. with a regex, s.t. the application state
>> will reflect input from either source cluster.". Wouldn't this mean that
>> the stateful "table" topic that we wish to materialize would be replicated
>> by MM2 from Primary, such that we end up with the following:
>>
>> *Replicated Entity/Stateful Data:*
>> *Primary Cluster: (Live)*
>> Topic: "table" (contains data from T = 0 to T = n)
>> SteamsAppPrimary is consuming from ("table")
>>
>> *Secondary Cluster: (Live)*
>> Topic: "primary.table" (contains data from T = 0 to T = n)
>> SteamsAppSecondary is consuming from ("primary.table")
>>
>> What does StreamsAppSecondary do when "primary.table" is no longer
>> replicated because Primary has died? Additionally, where should the
>> producer of topic "table" now write its data to, assuming that Primary
>> Cluster is irrevocably lost?
>>
>> I hope this better outlines my scenario. The trivial solution seems to be
>> to make your producers produce all stateful data (topic "table") to each
>> cluster, which makes MM2 unnecessary, but can also lead to data
>> inconsistencies so it's not exactly foolproof.
>>
>> Thanks
>>
>> On Mon, Jul 22, 2019 at 6:32 PM Ryanne Dolan <ryannedolan@gmail.com>
>> wrote:
>>
>>> Hello Adam, thanks for the questions. Yes my organization uses Streams,
>>> and yes you can use Streams with MM2/KIP-382, though perhaps not in the way
>>> you are describing.
>>>
>>> The architecture you mention is more "active/standby" than
>>> "active/active" IMO. The "secondary" cluster is not being used until a
>>> failure, at which point you migrate your app and expect the data to already
>>> be there. This works for normal consumers where you can seek() and
>>> --reset-offsets. Streams apps can be reset with the
>>> kafka-streams-application-reset tool, but as you point out, that doesn't
>>> help with rebuilding an app's internal state, which would be missing on the
>>> secondary cluster. (Granted, that may be okay depending on your particular
>>> application.)
>>>
>>> A true "active/active" solution IMO would be to run your same Streams
>>> app in _both_ clusters (primary, secondary), s.t. the entire application
>>> state is available and continuously updated in both clusters. As with
>>> normal consumers, the Streams app should subscribe to any remote topics,
>>> e.g. with a regex, s.t. the application state will reflect input from
>>> either source cluster.
>>>
>>> This is essentially what Streams' "standby replicas" are -- extra copies
>>> of application state to support quicker failover. Without these replicas,
>>> Streams would need to start back at offset 0 and re-process everything in
>>> order to rebuild state (which you don't want to do during a disaster,
>>> especially!). The same logic applies to using Streams with MM2. You _could_
>>> failover by resetting the app and rebuilding all the missing state, or you
>>> could have a copy of everything sitting there ready when you need it. The
>>> easiest way to do the latter is to run your app in both clusters.
>>>
>>> Hope that helps.
>>>
>>> Ryanne
>>>
>>> On Mon, Jul 22, 2019 at 3:11 PM Adam Bellemare <adam.bellemare@gmail.com>
>>> wrote:
>>>
>>>> Hi Ryanne
>>>>
>>>> I have a quick question for you about Active+Active replication and
>>>> Kafka Streams. First, does your org /do you use Kafka Streams? If not then
>>>> I think this conversation can end here. ;)
>>>>
>>>> Secondly, and for the broader Kafka Dev group - what happens if I want
>>>> to use Active+Active replication with my Kafka Streams app, say, to
>>>> materialize a simple KTable? Based on my understanding, I topic "table" on
>>>> the primary cluster will be replicated to the secondary cluster as
>>>> "primary.table". In the case of a full cluster failure for primary, the
>>>> producer to topic "table" on the primary switches over to the secondary
>>>> cluster, creates its own "table" topic and continues to write to there. So
>>>> now, assuming we have had no data loss, we end up with:
>>>>
>>>>
>>>> *Primary Cluster: (Dead)*
>>>>
>>>>
>>>> *Secondary Cluster: (Live)*
>>>> Topic: "primary.table" (contains data from T = 0 to T = n)
>>>> Topic: "table" (contains data from T = n+1 to now)
>>>>
>>>> If I want to materialize state from using Kafka Streams, obviously I am
>>>> now in a bit of a pickle since I need to consume "primary.table" before I
>>>> consume "table". Have you encountered rebuilding state in Kafka Streams
>>>> using Active-Active? For non-Kafka Streams I can see using a single
>>>> consumer for "primary.table" and one for "table", interleaving the
>>>> timestamps and performing basic event dispatching based on my own tracked
>>>> stream-time, but for Kafka Streams I don't think there exists a solution
to
>>>> this.
>>>>
>>>> If you have any thoughts on this or some recommendations for Kafka
>>>> Streams with Active-Active I would be very appreciative.
>>>>
>>>> Thanks
>>>> Adam
>>>>
>>>>
>>>>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message